[jira] [Commented] (APEXMALHAR-2260) Operator that can execute python code

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Commented] (APEXMALHAR-2260) Operator that can execute python code

JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413870#comment-16413870 ]

ASF GitHub Bot commented on APEXMALHAR-2260:
--------------------------------------------

tweise closed pull request #683: APEXMALHAR-2260.PythonExecutionOperator-Execute python code as part of DAG
URL: https://github.com/apache/apex-malhar/pull/683
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index c8925c4a90..0f62475106 100644
--- a/pom.xml
+++ b/pom.xml
@@ -216,6 +216,7 @@
     <module>contrib</module>
     <module>kafka</module>
     <module>kudu</module>
+    <module>python</module>
     <module>examples</module>
   </modules>
 
diff --git a/python/pom.xml b/python/pom.xml
new file mode 100755
index 0000000000..9a2d13f874
--- /dev/null
+++ b/python/pom.xml
@@ -0,0 +1,94 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar</artifactId>
+    <version>4.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-python</artifactId>
+  <name>Apex library python support</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <jep-version>3.7.0</jep-version>
+    <jepInstallPath>/usr/local/lib/python3.5/site-packages/jep</jepInstallPath> <!-- Purposefully empty as it is ideally set from command line basing on env but
+     avoid errors in IDEs validating a pom.xml which requires a property to be set when referred in the configs-->
+    <disruptor-queue-conversant-media-version>1.2.10</disruptor-queue-conversant-media-version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.core.version}</version>
+      <type>jar</type>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>black.ninia</groupId>
+      <artifactId>jep</artifactId>
+      <version>${jep-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.conversantmedia</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>${disruptor-queue-conversant-media-version}</version>
+      <classifier>jdk7</classifier><!-- Set classifier to jdk8 when malhar switches to JDK 8 -->
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>once</forkMode>
+          <argLine>-Djava.library.path=${jepInstallPath}</argLine>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonEngine.java b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonEngine.java
new file mode 100644
index 0000000000..6370e32183
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonEngine.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+/**
+ * Defines the methods that needs to be implemented by the Python Engine implementations. The first implementation
+ *  takes the approach of in-memory interpreter using JEP. Other possibilities are using Py4J which is an inter process
+ *   communication model. An Apex operator would use an instance of the Python engine implementations to run
+ * python code using the chosen engine.
+ */
+public interface ApexPythonEngine
+{
+  /***
+   * Used to perform any pre interpreter processing.
+   * @param preInitConfigs The configuration that is going to be used by the interpreter
+   * @throws ApexPythonInterpreterException if there is an issue in executing the pre interpreter logic
+   */
+  void preInitInterpreter(Map<PythonInterpreterConfig,Object> preInitConfigs) throws ApexPythonInterpreterException;
+
+  /***
+   * Starts the interpreter.
+   * @throws ApexPythonInterpreterException if library not locatable or any other issue starting the interpreter
+   */
+  void startInterpreter() throws ApexPythonInterpreterException;
+
+  /***
+   * Used to perform any logic that needs to be executed after the interpreter is started but before any tuples start
+   *  getting processed. Example, setting the starting state of the variables that are used in tuple processing.
+   * @throws ApexPythonInterpreterException
+   */
+  void postStartInterpreter() throws ApexPythonInterpreterException;
+
+  /***
+   * Runs a series of commands. The implementation engine could make use of a worker pool to execute the command.
+   * @param executionMode Whether these commands need to be run on all worker thread or any of the worker thread.Please see
+   *    *                       {@link WorkerExecutionMode} for choices available
+   * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+   *                 we want to implement a sticky worker in the near future. This will allow for a basic approach to
+   *                 route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+   *                 worker logic, the window ID along with the Request ID is used to implement a round robin approach
+   *                 to select the next worker. Note that Sticky worker might be required since python interpreter
+   *                 state is accumulated over as the commands run and a command can reference a variable created in a
+   *                 previous command etc. Such references might want to route all commands to a specific interpreter
+   *                 instance. If the Apex python engine is not being used by an operator implementation directly,
+   *                 the caller can pass in any number as it is not used in anything more than selecting a worker from a
+   *                 worker pool.
+   * @param requestId The parameter is used to select a worker from the
+   *                  worker pool along with the window Id. If the Apex python engine is not being used by an
+   *                  operator implementation directly, the caller can pass in any number as it is not used in anything
+   *                  more than selecting a worker from a worker pool.
+   * @param request Represents the request to be processed.
+   * @return A map with key as the command run and boolean as the value. True represents that the command successfully
+   *  run.
+   * @throws ApexPythonInterpreterException if interrupted or if the command cannot be executed
+   */
+  Map<String,PythonRequestResponse<Void>> runCommands(WorkerExecutionMode executionMode, long windowId, long requestId,
+      PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException;
+
+  /***
+   * Executes a method call
+   * @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
+   *    *                       {@link WorkerExecutionMode} for choices available
+   * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+   *                 we want to implement a sticky worker in the near future. This will allow for a basic approach to
+   *                 route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+   *                 worker logic, the window ID along with the Request ID is used to implement a round robin approach
+   *                 to select the next worker. Note that Sticky worker might be required since python interpreter
+   *                 state is accumulated over as the commands run and a command can reference a variable created in a
+   *                 previous command etc. Such references might want to route all commands to a specific interpreter
+   *                 instance. If the Apex python engine is not being used by an operator implementation directly,
+   *                 the caller can pass in any number as it is not used in anything more than selecting a worker from a
+   *                 worker pool.
+   * @param requestId The parameter is used to select a worker from the
+   *                  worker pool along with the window Id. If the Apex python engine is not being used by an
+   *                  operator implementation directly, the caller can pass in any number as it is not used in anything
+   *                  more than selecting a worker from a worker pool.
+   * @param req Represents the request to be processed.
+   * @param <T>
+   * @return A map containing the worker ID as key and boolean as successful or not
+   * @throws ApexPythonInterpreterException
+   */
+  <T> Map<String,PythonRequestResponse<T>> executeMethodCall(WorkerExecutionMode executionMode,long windowId,
+      long requestId, PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException;
+
+  /***
+   * Executes a script that is locatable via a file path
+   * @param executionMode  If the method call needs to be invoked on all workers or any single worker. Please see
+   *                       {@link WorkerExecutionMode} for choices available
+   * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+   *                 we want to implement a sticky worker in the near future. This will allow for a basic approach to
+   *                 route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+   *                 worker logic, the window ID along with the Request ID is used to implement a round robin approach
+   *                 to select the next worker. Note that Sticky worker might be required since python interpreter
+   *                 state is accumulated over as the commands run and a command can reference a variable created in a
+   *                 previous command etc. Such references might want to route all commands to a specific interpreter
+   *                 instance. If the Apex python engine is not being used by an operator implementation directly,
+   *                 the caller can pass in any number as it is not used in anything more than selecting a worker from a
+   *                 worker pool.
+   * @param requestId The parameter is used to select a worker from the
+   *                  worker pool along with the window Id. If the Apex python engine is not being used by an
+   *                  operator implementation directly, the caller can pass in any number as it is not used in anything
+   *                  more than selecting a worker from a worker pool.
+   * @param request Represents the request to be processed.
+   * @return A map containing the worker ID as key and boolean as successful or not
+   * @throws ApexPythonInterpreterException
+   */
+  Map<String,PythonRequestResponse<Void>>  executeScript(WorkerExecutionMode executionMode,long windowId,long requestId,
+      PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException;
+
+  /***
+   * Evaluates a string as a python expression and also supports passing in variables from JVM to the python interpreter
+   * @param executionMode If the method call needs to be invoked on all workers or any single worker. Please see
+   *    *                       {@link WorkerExecutionMode} for choices available
+   * @param windowId used to select the worker from the worker thread pool. This parameter gains significance if
+   *                 we want to implement a sticky worker in the near future. This will allow for a basic approach to
+   *                 route the command/s to the same worker if the application logic needs it to be. In the case of ANY
+   *                 worker logic, the window ID along with the Request ID is used to implement a round robin approach
+   *                 to select the next worker. Note that Sticky worker might be required since python interpreter
+   *                 state is accumulated over as the commands run and a command can reference a variable created in a
+   *                 previous command etc. Such references might want to route all commands to a specific interpreter
+   *                 instance. If the Apex python engine is not being used by an operator implementation directly,
+   *                 the caller can pass in any number as it is not used in anything more than selecting a worker from a
+   *                 worker pool.
+   * @param requestId The parameter is used to select a worker from the
+   *                  worker pool along with the window Id. If the Apex python engine is not being used by an
+   *                  operator implementation directly, the caller can pass in any number as it is not used in anything
+   *                  more than selecting a worker from a worker pool.
+   * @param req Represents the request to be processed.
+   * @param <T> Java templating signature
+   * @return A map containing the worker ID as key and boolean as successful or not
+   * @throws ApexPythonInterpreterException
+   */
+  <T> Map<String,PythonRequestResponse<T>> eval(WorkerExecutionMode executionMode, long windowId, long requestId,
+      PythonInterpreterRequest<T> req)  throws ApexPythonInterpreterException;
+
+  /***
+   * @return The queue that holds all of the straggler responses.
+   */
+  BlockingQueue<PythonRequestResponse> getDelayedResponseQueue();
+
+  void setDelayedResponseQueue(BlockingQueue<PythonRequestResponse> delayedResponseQueue);
+
+  /***
+   * @return The number of times the engine could not process a request as there were no free worker threads and hence
+   *  had to return null
+   */
+  long getNumStarvedReturns();
+
+
+  void setNumStarvedReturns(long numStarvedReturns);
+
+  /**
+   * Returns all of the commands that were executed on all of the worker nodes.
+   * @return History of all commands executed in sequence
+   */
+  List<PythonRequestResponse> getCommandHistory();
+
+  void setCommandHistory(List<PythonRequestResponse> commandHistory);
+
+  void stopInterpreter() throws ApexPythonInterpreterException;
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonInterpreterException.java b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonInterpreterException.java
new file mode 100644
index 0000000000..be64d3e796
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/ApexPythonInterpreterException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+/**
+ * An exception used to denote issues when using the ApexPython Engine.
+ */
+public class ApexPythonInterpreterException extends RuntimeException
+{
+  public ApexPythonInterpreterException(Throwable cause)
+  {
+    super(cause);
+  }
+
+  public ApexPythonInterpreterException(String message)
+  {
+    super(message);
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/BasePythonExecutionOperator.java b/python/src/main/java/org/apache/apex/malhar/python/base/BasePythonExecutionOperator.java
new file mode 100644
index 0000000000..cfc2056d1f
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/BasePythonExecutionOperator.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.jep.InterpreterThread;
+import org.apache.apex.malhar.python.base.jep.JepPythonEngine;
+import org.apache.apex.malhar.python.base.jep.SpinPolicy;
+import org.apache.apex.malhar.python.base.partitioner.AbstractPythonExecutionPartitioner;
+import org.apache.apex.malhar.python.base.partitioner.PythonExecutionPartitionerType;
+import org.apache.apex.malhar.python.base.partitioner.ThreadStarvationBasedPartitioner;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/***
+ * <p>The operator provides a mechanism to execute arbitrary python code by using the {@link JepPythonEngine} as its
+ *  default engine.</p>
+ *
+ * <p>See {@link JepPythonEngine} and {@link InterpreterThread} for more detailed javadocs about the interpreter
+ *  itself and the API patterns possible through this engine</p>
+ *
+ * <p>Note that the JVM option of the library path needs to be set to a value which contains the JEP dynamic library.
+ *  For example, -Djava.library.path=/usr/local/lib/python3.5/site-packages/jep assuming that the JEP library is
+ *   installed at the above location
+ * </p>
+ *
+ * <p>If using CPython libraries which involve global variables, please use the
+ *  {@link PythonInterpreterConfig#PYTHON_SHARED_LIBS} as one of the keys to specify this library as a shared library.
+ *  Note that the interpreter configs are passed via {@link ApexPythonEngine#preInitInterpreter(Map)} method.
+ *  Overrriding the {@link BasePythonExecutionOperator#getPreInitConfigurations()} and specifying the configs can
+ *   help in specifying the shared libraries that are loaded by all the interpreter threads accordingly.</p>
+ *
+ * <p>The operator comes with the following limitations
+ * <ol>
+ *   <li>Cannot be used in THREAD LOCAL MODE where the downstream operator is using a different version of the
+ *    the python interpreter</li>
+ *   <li>In certain use cases the operator cannot be used CONTAINER LOCAL MODE when there are global defs in the
+ *    CPython library that is being used and the downstream operator depends on those globals ( even though downstream
+ *     is using the same python version of the upstream operator</li>
+ *   <li>Only CPython libraries are supported and uses the JNI mechanisms to use the CPython libraries</li>
+ *   <li>Complex data types cannot be automatically translated to the Python interpreter space. Current
+ *    workaround involves copying the basic types and build the complex type using python code and functionality
+ *     provided by the {@link JepPythonEngine} </li>
+ *   <li>Numpy arrays need to be specified as {@link NDimensionalArray} and the engine automatically translates them
+ *    to a Numpy array. See {@link NDimensionalArray#toNDArray()} for more details</li>
+ * </ol>
+ * </p>
+ *  <p>
+ *    Shared libraries enable sharing of global modules across interpreter workers in a work pool.
+ *     The following snippet of code illustrated the registering of numpy as a shared module.
+ *     <code>
+ *       Map<PythonInterpreterConfig,Object> preInitConfigs = new HashMap<>();
+ *       Set<String> sharedLibsList = new HashSet<>();
+ *       sharedLibsList.add("numpy");
+ *       preInitConfigs.put(PythonInterpreterConfig.PYTHON_SHARED_LIBS, sharedLibsList);
+ *     </code>
+ *     Passing the above config in overriding {@link BasePythonExecutionOperator#getPreInitConfigurations()} will help
+ *      in using modules like numpy which have global shared variables.
+ *  </p>
+ *  <p>
+ *    If there are errors while running the operator code with exceptions as module 'jep' has no attribute 'java_import_hook'
+ *    This essentially is generated when JEP is not able to load a library that the python code is using. Ensure
+ *    that the PYTHONPATH environment variable is set pointing to the right location. This is especially
+ *     required when there are multiple python versions in the code.
+ *  </p>
+ *
+ * <p>
+ *     Java 9 is not yet supported. Support only exists for Java 8 and Java 7 runtimes.
+ * </p>
+ *
+ *  <p>For very low time SLA requirements, it is encouraged to set the spin policy to be busy wait using the
+ *   following configuration snippet for the JEP engine
+ *   <code>
+ *       Map<PythonInterpreterConfig,Object> preInitConfigs = new HashMap<>();
+ *       preInitConfigs.put(PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY, ""+ SpinPolicy.BUSY_SPIN.name());
+ *   </code>
+ *   This mapping can be set by overriding {@link BasePythonExecutionOperator#getPreInitConfigurations()}. For more
+ *    details refer {@link SpinPolicy}.
+ *  </p>
+ * @param <T> Represents the incoming tuple.
+ */
+public class BasePythonExecutionOperator<T> extends BaseOperator implements
+    Operator.ActivationListener<Context.OperatorContext>, Partitioner<BasePythonExecutionOperator>,
+    Operator.CheckpointNotificationListener, Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(BasePythonExecutionOperator.class);
+
+  /*  a rolling counter for all requests in the current window */
+  protected transient long requestCounterForThisWindow = 0;
+
+  protected transient long responseCounterForThisWindow = 0;
+
+  /***
+   * Blocks the operator thread in endWindow until all of the tuples for the window are successfully emitted. Note
+   *  that these might be emitted from the stragglers port as well.
+   */
+  protected boolean blockAtEndOfWindowForStragglers = false;
+
+  protected transient long currentWindowId = 0;
+
+  private long numberOfRequestsProcessedPerCheckpoint = 0;
+
+  /*** Configuration used to decide if a new operator needs to be spawned while dynamic partitioning planning is taking
+   place. This represents the threshold for percent number of times the requests were starved when no threads were
+    available to process a request. See {@link ThreadStarvationBasedPartitioner}
+   */
+  private float starvationPercentBeforeSpawningNewInstance = 30;
+
+  private transient ApexPythonEngine apexPythonEngine;
+
+  /*** number of workers in the worker pool */
+  private int workerThreadPoolSize = 3;
+
+  /** Can be used to set the additional File system paths that the interpreter can use to locate modules. Use
+   *  commas as separator when representing a collection of strings */
+  private String interpreterEngineIncludePaths;
+
+  /** Can be used to add shared modules that an interpreter engine can load if working within JVM memory space. Use
+   *  commas as separator when representing a collection of strings */
+  private String interpreterEngineSharedLibNames;
+
+  /** Used to represent the behaviour of the interpreter thread if no requests are found in its queue. Used to tune
+   *  for extremely sensitive SLAs for computation. See {@link SpinPolicy} for possible string values representation */
+  private String idleInterpreterSpinPolicy = SpinPolicy.SLEEP.name();
+
+  /** Used to represent the behaviour of the disruptor queue that processes request response dispatches. Tune this
+   *  to manage the low latency behaviors. See {@link SpinPolicy} for possible string value representation */
+  private String requestQueueWaitSpinPolicy = SpinPolicy.SLEEP.name();
+
+  private int sleepTimeInMSInCaseOfNoRequests = 5;
+
+
+  /*** Time for which the python engine will sleep to allow for the interpreter will sleep before worker can be used
+   *  for executing work requests.
+   */
+  private long sleepTimeDuringInterpreterBoot = 2000L;
+
+  private PythonExecutionPartitionerType partitionerType = PythonExecutionPartitionerType.THREAD_STARVATION_BASED;
+
+  private transient AbstractPythonExecutionPartitioner partitioner;
+
+  /*** port into which the stragglers will be emitted */
+  public final transient DefaultOutputPort<PythonRequestResponse> stragglersPort =
+      new com.datatorrent.api.DefaultOutputPort<>();
+
+  /*** Port into which the normal execution path will push the results into */
+  public final transient DefaultOutputPort<PythonRequestResponse> outputPort =
+      new com.datatorrent.api.DefaultOutputPort<>();
+
+  /*** Port into which all error tuples will be emitted into */
+  public final transient DefaultOutputPort<T> errorPort = new com.datatorrent.api.DefaultOutputPort<>();
+
+  private Object objectForLocking = new Object();
+
+  /*** A counter that is used to track how many times a request could not be serviced within a given window. Used
+   by the {@link ThreadStarvationBasedPartitioner} to spawn a new instance of the operator based on a configuration
+   threshold
+   */
+  @AutoMetric
+  private long numStarvedReturnsPerCheckpoint = 0;
+
+  @AutoMetric
+  private long numNullResponsesPerWindow = 0;
+
+  /***
+   * Represents all python commands that need to be run on a new instance of the operator after dynamic partitioning
+   */
+  private List<PythonRequestResponse> accumulatedCommandHistory = new ArrayList<>();
+
+  /***
+   * Processes the incoming tuple using the python engine that is injected. Also emits stragglers if any.
+   */
+  @InputPortFieldAnnotation
+  public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      numberOfRequestsProcessedPerCheckpoint += 1;
+      requestCounterForThisWindow += 1;
+      emitStragglers();
+      try {
+        PythonRequestResponse result = processPythonCodeForIncomingTuple(tuple,getApexPythonEngine());
+        if ( result != null) {
+          responseCounterForThisWindow += 1;
+          outputPort.emit(result);
+        } else {
+          numNullResponsesPerWindow += 1;
+        }
+      } catch (ApexPythonInterpreterException e) {
+        responseCounterForThisWindow += 1; // Error tuples need to be accounted for totals
+        errorPort.emit(tuple);
+        LOG.debug("Error while processing tuple", e);
+      }
+    }
+  };
+
+
+  @Override
+  public void activate(Context.OperatorContext context)
+  {
+    getApexPythonEngine().setNumStarvedReturns(0L);
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  /***
+   * Used to emit the stragglers into the Stragglers port. Invoked either when a new tuple arrives or when Idle time
+   *  is detected on this operator.
+   */
+  private void emitStragglers()
+  {
+    LOG.debug("Emitting stragglers");
+    List<PythonRequestResponse> stragglerResponse = new ArrayList<>();
+    getApexPythonEngine().getDelayedResponseQueue().drainTo(stragglerResponse);
+    for (PythonRequestResponse aReqResponse : stragglerResponse) {
+      responseCounterForThisWindow += 1;
+      stragglersPort.emit(aReqResponse);
+    }
+  }
+
+  /***
+   * Instantiates the configured python engine. Only in-memory implementation provided for now
+   * @param context The operator context
+   * @return The python engine
+   */
+  protected ApexPythonEngine initApexPythonEngineImpl(Context.OperatorContext context)
+  {
+    JepPythonEngine jepPythonEngine = new JepPythonEngine("" + context.getId(),workerThreadPoolSize);
+    jepPythonEngine.setSleepTimeAfterInterpreterStart(getSleepTimeDuringInterpreterBoot());
+    LOG.debug("Initialized Python engine " + jepPythonEngine);
+    return jepPythonEngine;
+  }
+
+  /***
+   * Instantiates the partitioner. Only Thread Starvation based partitioner for now.
+   */
+  private void initPartitioner()
+  {
+    if (partitioner == null) {
+      synchronized (objectForLocking) {
+        if (partitioner == null) {
+          switch (partitionerType) {
+            default:
+            case THREAD_STARVATION_BASED:
+              partitioner = new ThreadStarvationBasedPartitioner(this);
+              break;
+          }
+        }
+      }
+    }
+  }
+
+  /***
+   * Starts the python engine and also sleeps for sometime ( configurable) to ensure that the interpreter is
+   *  completely booted up in memory. The time taken to boot the interpreter depends on the libraries that are loaded etc
+   *  and hence tune the sleeptimeToBoot parameter accordingly.
+   * @param context The Operator context
+   */
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    LOG.debug("Initializing the python interpreter setup");
+    apexPythonEngine = initApexPythonEngineImpl(context);
+    Map<PythonInterpreterConfig,Object> preInitConfigurations = getPreInitConfigurations();
+    apexPythonEngine.preInitInterpreter(preInitConfigurations);
+    apexPythonEngine.startInterpreter();
+    LOG.debug("Python interpreter started. Now invoking post interpreter logic");
+    processPostSetUpPythonInstructions(apexPythonEngine);
+    LOG.debug("Python post interpreter logic complete");
+  }
+
+  @Override
+  public void teardown()
+  {
+    super.teardown();
+    apexPythonEngine.stopInterpreter();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    requestCounterForThisWindow = 0;
+    responseCounterForThisWindow = 0;
+    numNullResponsesPerWindow = 0;
+    currentWindowId = windowId;
+  }
+
+  /***
+   * if blockAtEndOfWindowForStragglers is configured to be true ( via a setter or config ), then end window loops
+   *  until all the stragglers are emitted. If this configuration element is set to false, then there is no loop
+   *  waiting for the stragglers to be completed emitting. Note that there is a sleep in case the configuration
+   *  blockAtEndOfWindowForStragglers value is set to true between each bulk emit of all stragglers that arrived at that
+   *   point in time.
+   */
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+    if (responseCounterForThisWindow < requestCounterForThisWindow) {
+      LOG.debug("Detected stragglers and configured flag/state for blocking at end of window is " +
+          blockAtEndOfWindowForStragglers);
+      if (blockAtEndOfWindowForStragglers) {
+        LOG.debug("Trying to emit all stragglers before the next window can be processed");
+        while (responseCounterForThisWindow < requestCounterForThisWindow) {
+          emitStragglers();
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            LOG.debug("Thread interrupted while trying to emit all stragglers");
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public Collection<Partition<BasePythonExecutionOperator>> definePartitions(
+      Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context)
+  {
+    initPartitioner();
+    return partitioner.definePartitions(partitions,context);
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<BasePythonExecutionOperator>> partitions)
+  {
+    initPartitioner();
+    partitioner.partitioned(partitions);
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    accumulatedCommandHistory.clear();
+    accumulatedCommandHistory.addAll(getApexPythonEngine().getCommandHistory());
+    numStarvedReturnsPerCheckpoint = getApexPythonEngine().getNumStarvedReturns();
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+    getApexPythonEngine().setNumStarvedReturns(0L);
+    numberOfRequestsProcessedPerCheckpoint = 0;
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+  }
+
+  /***
+   * Override this to hook any logic that would be needed immediately after setup of the interpreter. Some of the
+   * actions include setting up an interpreter state which can be used across all tuple invocations
+   * @param pythonEngine
+   * @throws ApexPythonInterpreterException
+   */
+  public void processPostSetUpPythonInstructions(ApexPythonEngine pythonEngine) throws ApexPythonInterpreterException
+  {
+    apexPythonEngine.postStartInterpreter();
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    emitStragglers();
+  }
+
+  public ApexPythonEngine getApexPythonEngine()
+  {
+    return apexPythonEngine;
+  }
+
+  public void setApexPythonEngine(ApexPythonEngine apexPythonEngine)
+  {
+    this.apexPythonEngine = apexPythonEngine;
+  }
+
+  /***
+   * Override this method to perform any meaningful work in python interpreter space.
+   * @param input The incoming tuple
+   * @param pythonEngineRef The impementation of the Python interpreter that can be used to execute python commands
+   * @return The result of execution wrapped as a PythonRequestResponse Object
+   * @throws ApexPythonInterpreterException if interrupted or no workers avaialble to execute the python code.
+   */
+  public PythonRequestResponse processPythonCodeForIncomingTuple(T input, ApexPythonEngine pythonEngineRef)
+    throws ApexPythonInterpreterException
+  {
+    return null;
+  }
+
+  /***
+   * See constants defined in {@link PythonInterpreterConfig} for a list of keys available. Application properties xml
+   *  file can be used to set the values for the interpreter configuration. Override this method
+   *  to refine the configuration that is being used to start the interpreter instance. Please see test
+   *   application implemented in the test code for example usage.
+   * @return
+   */
+  public Map<PythonInterpreterConfig,Object> getPreInitConfigurations()
+  {
+    Map<PythonInterpreterConfig,Object> configsForInterpreter = new HashMap<>();
+    String includePaths = getInterpreterEngineIncludePaths();
+    if (includePaths != null) {
+      List<String> interpreterIncludePaths = new ArrayList<>();
+      interpreterIncludePaths.addAll(Arrays.asList(includePaths.split(",")));
+      configsForInterpreter.put(PythonInterpreterConfig.PYTHON_INCLUDE_PATHS, interpreterIncludePaths);
+    }
+    String sharedLibs = getInterpreterEngineSharedLibNames();
+    if (sharedLibs != null) {
+      List<String> sharedLibNames = new ArrayList<>();
+      sharedLibNames.addAll(Arrays.asList(sharedLibs.split(",")));
+      configsForInterpreter.put(PythonInterpreterConfig.PYTHON_SHARED_LIBS, sharedLibNames);
+    }
+    configsForInterpreter.put(PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY, getIdleInterpreterSpinPolicy());
+    configsForInterpreter.put(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY, getRequestQueueWaitSpinPolicy());
+    configsForInterpreter.put(PythonInterpreterConfig.SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS,
+        getSleepTimeInMSInCaseOfNoRequests());
+    return configsForInterpreter;
+  }
+
+  public long getSleepTimeDuringInterpreterBoot()
+  {
+    return sleepTimeDuringInterpreterBoot;
+  }
+
+  public void setSleepTimeDuringInterpreterBoot(long sleepTimeDuringInterpreterBoot)
+  {
+    this.sleepTimeDuringInterpreterBoot = sleepTimeDuringInterpreterBoot;
+  }
+
+  public int getWorkerThreadPoolSize()
+  {
+    return workerThreadPoolSize;
+  }
+
+  public void setWorkerThreadPoolSize(int workerThreadPoolSize)
+  {
+    this.workerThreadPoolSize = workerThreadPoolSize;
+  }
+
+  public PythonExecutionPartitionerType getPartitionerType()
+  {
+    return partitionerType;
+  }
+
+  public void setPartitionerType(PythonExecutionPartitionerType partitionerType)
+  {
+    this.partitionerType = partitionerType;
+  }
+
+  public long getNumberOfRequestsProcessedPerCheckpoint()
+  {
+    return numberOfRequestsProcessedPerCheckpoint;
+  }
+
+  public void setNumberOfRequestsProcessedPerCheckpoint(long numberOfRequestsProcessedPerCheckpoint)
+  {
+    this.numberOfRequestsProcessedPerCheckpoint = numberOfRequestsProcessedPerCheckpoint;
+  }
+
+  public AbstractPythonExecutionPartitioner getPartitioner()
+  {
+    return partitioner;
+  }
+
+  public void initPartitioner(AbstractPythonExecutionPartitioner partitioner)
+  {
+    this.partitioner = partitioner;
+  }
+
+  public float getStarvationPercentBeforeSpawningNewInstance()
+  {
+    return starvationPercentBeforeSpawningNewInstance;
+  }
+
+  public void setStarvationPercentBeforeSpawningNewInstance(float starvationPercentBeforeSpawningNewInstance)
+  {
+    this.starvationPercentBeforeSpawningNewInstance = starvationPercentBeforeSpawningNewInstance;
+  }
+
+  public long getNumStarvedReturns()
+  {
+    return numStarvedReturnsPerCheckpoint;
+  }
+
+  public void setNumStarvedReturns(long numStarvedReturns)
+  {
+    this.numStarvedReturnsPerCheckpoint = numStarvedReturns;
+  }
+
+  public List<PythonRequestResponse> getAccumulatedCommandHistory()
+  {
+    return accumulatedCommandHistory;
+  }
+
+  public void setAccumulatedCommandHistory(List<PythonRequestResponse> accumulatedCommandHistory)
+  {
+    this.accumulatedCommandHistory = accumulatedCommandHistory;
+  }
+
+  public boolean isBlockAtEndOfWindowForStragglers()
+  {
+    return blockAtEndOfWindowForStragglers;
+  }
+
+  public void setBlockAtEndOfWindowForStragglers(boolean blockAtEndOfWindowForStragglers)
+  {
+    this.blockAtEndOfWindowForStragglers = blockAtEndOfWindowForStragglers;
+  }
+
+  public String getInterpreterEngineIncludePaths()
+  {
+    return interpreterEngineIncludePaths;
+  }
+
+  public void setInterpreterEngineIncludePaths(String interpreterEngineIncludePaths)
+  {
+    this.interpreterEngineIncludePaths = interpreterEngineIncludePaths;
+  }
+
+  public String getInterpreterEngineSharedLibNames()
+  {
+    return interpreterEngineSharedLibNames;
+  }
+
+  public void setInterpreterEngineSharedLibNames(String interpreterEngineSharedLibNames)
+  {
+    this.interpreterEngineSharedLibNames = interpreterEngineSharedLibNames;
+  }
+
+  public String getIdleInterpreterSpinPolicy()
+  {
+    return idleInterpreterSpinPolicy;
+  }
+
+  public void setIdleInterpreterSpinPolicy(String idleInterpreterSpinPolicy)
+  {
+    this.idleInterpreterSpinPolicy = idleInterpreterSpinPolicy;
+  }
+
+  public String getRequestQueueWaitSpinPolicy()
+  {
+    return requestQueueWaitSpinPolicy;
+  }
+
+  public void setRequestQueueWaitSpinPolicy(String requestQueueWaitSpinPolicy)
+  {
+    this.requestQueueWaitSpinPolicy = requestQueueWaitSpinPolicy;
+  }
+
+  public int getSleepTimeInMSInCaseOfNoRequests()
+  {
+    return sleepTimeInMSInCaseOfNoRequests;
+  }
+
+  public void setSleepTimeInMSInCaseOfNoRequests(int sleepTimeInMSInCaseOfNoRequests)
+  {
+    this.sleepTimeInMSInCaseOfNoRequests = sleepTimeInMSInCaseOfNoRequests;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/PythonInterpreterConfig.java b/python/src/main/java/org/apache/apex/malhar/python/base/PythonInterpreterConfig.java
new file mode 100644
index 0000000000..f904d9826b
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/PythonInterpreterConfig.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import java.util.Map;
+
+import org.apache.apex.malhar.python.base.jep.JepPythonEngine;
+
+/***
+ * Used as key configs while passing the pre interpreter configuration. See
+ *  {@link JepPythonEngine#preInitInterpreter(Map)}
+ */
+public enum PythonInterpreterConfig
+{
+  PYTHON_INCLUDE_PATHS,
+  PYTHON_SHARED_LIBS,
+  IDLE_INTERPRETER_SPIN_POLICY,
+  REQUEST_QUEUE_WAIT_SPIN_POLICY,
+  SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS;
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/WorkerExecutionMode.java b/python/src/main/java/org/apache/apex/malhar/python/base/WorkerExecutionMode.java
new file mode 100644
index 0000000000..e024ab0a53
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/WorkerExecutionMode.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base;
+
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+
+/***
+ * <p>Used to specify if a given API invocation in the in-memory interpreter is going to be invoked for all the worker
+ *  threads, a sticky thread or just any one thread.
+ *
+ *  - {@link WorkerExecutionMode#BROADCAST} is to be used when the
+ *   command is resulting in a state of the interpreter which has to be used in subsequent calls. For example,
+ *   deserializing a machine learning model can be used as a BROADCAST model as the scoring can then be invoked across
+ *   all worker threads if required.
+ *  - {@link WorkerExecutionMode#ANY} Represents a state wherein any worker can be used to execute the code.
+ *   Example would be scoring an incoming tuple on which the model has already been deserialized across all nodes
+ *  - {@link WorkerExecutionMode#STICKY} Use STICKY if the same worker needs to service the request.
+ *    The downside of this is that it may or may not complete on time and depends on the queue length.</p>
+ *
+ *   <p><b>Ensure the {@link PythonInterpreterRequest#hashCode()} is overridden if STICKY is chosen </b></p>
+ */
+public enum WorkerExecutionMode
+{
+  BROADCAST,
+  ANY,
+  STICKY
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterThread.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterThread.java
new file mode 100644
index 0000000000..75fa5520eb
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterThread.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.requestresponse.ScriptExecutionRequestPayload;
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+import jep.Jep;
+import jep.JepConfig;
+import jep.JepException;
+import jep.NDArray;
+
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY;
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.PYTHON_INCLUDE_PATHS;
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.PYTHON_SHARED_LIBS;
+import static org.apache.apex.malhar.python.base.PythonInterpreterConfig.SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS;
+
+/**
+ * <p>
+ * Represents a python interpreter instance embedded in JVM memory using the JEP ( Java embedded Python ) engine.
+ * JEP uses JNI wrapper around the embedded Python instance. JEP mandates that the thread that created the JEP
+ * instance is the only thread that can perform method calls on the embedded interpreter. This requires
+ * the Apex operator implementation to decouple JEP execution logic from the operator processing main thread.
+ * <b>Note that this embedded python is an interpreter and this essentially means the state of the interpreter
+ * is maintained across all calls to the interpreter.</b>
+ * The threaded implementation provides the following main functionalities
+ * <ol>
+ *   <li>An evaluation expression that can interpret a string as a python command. The user can also set
+ *    variable values that are
+ *      <ul>
+ *         <li>Transferred to the interpreter with the same variable names</li>
+ *         <li>Garbage collected from the python interpreter space</li>
+ *      </ul>
+ *   </li>
+ *   <li>
+ *     A method call invocation wherein parameters can be sent to the previously defined method (the method must have to
+ *      be defined perhaps via an eval expression or a previous execute script call)
+ *   </li>
+ *   <li>A script call command that can execute a script. There is currently no support to pass params to scripts</li>
+ *   <li>A handy mechanism to execute a series of commands. Note that this is a simple wrapper around the
+ *    eval expression. The main difference here is that there are no user variables substitution used in
+ *     this model. This is useful for statements like import ( ex: import numpy as np ) which does not require
+ *      user variables conversion</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ *   The logic is executed using a request and response queue model. The thread keeps consuming from a request queue
+ *   and submits results to a response queue.
+ * </p>
+ * <p>
+ *   Note that all outputs are being redirected to the standard logger. Hence using statements like print(secret)
+ *   needs to be avoided as the result of the print command is captured in the log.
+ * </p>
+ * <p>
+ *   When using Cpython libraries like numpy, <b>ensure you first register numpy as a shared library</b> before using it
+ *   in even import statements. Not doing so will result in very obscure errors.
+ * </p>
+ */
+
+public class InterpreterThread implements Runnable
+{
+  private static final Logger LOG = LoggerFactory.getLogger(InterpreterThread.class);
+
+  /* Name of the dynamically loaded JNI library */
+  public static final String JEP_LIBRARY_NAME = "jep";
+
+  /* The string command which will be used to delete python variables after they are used. */
+  public static final String PYTHON_DEL_COMMAND = "del ";
+
+  public transient Jep JEP_INSTANCE;
+
+  /* Used by the operator thread or other threads to mark the stopping of processing of the interpreter command loop */
+  private transient volatile boolean isStopped = false;
+
+  /* Used to represent the current state of this thread whether it is currently busy executing a command */
+  private transient volatile boolean busyFlag = false;
+
+  /* Represents the default amount of time that this thread will wait to read a command from the request queue */
+  private long timeOutToPollFromRequestQueue = 1;
+
+  private TimeUnit timeUnitsToPollFromRequestQueue = TimeUnit.MILLISECONDS;
+
+  private transient volatile BlockingQueue<PythonRequestResponse> requestQueue;
+
+  private transient volatile BlockingQueue<PythonRequestResponse> responseQueue;
+
+  /* An id that can be useful while logging statements */
+  private String threadID;
+
+  /* Whether this thread should sleep for a few moments if there are no requests are keep checking the request queue */
+  private SpinPolicy spinPolicy = SpinPolicy.SLEEP;
+
+  /* Holds the configs that are used to initialize the interpreter thread. Examples of config are shared libraries and
+  include paths for the interpreter. The key is one of the constants defined in PythonInterpreterConfig and value
+    is specific to the config type that is being set.
+   */
+  private Map<PythonInterpreterConfig,Object> initConfigs = new HashMap<>();
+
+  /* Used as a flag to denote an error situation in the interpreter so that the next set of commands to run
+   *  an empty/null eval expression to clear any erraneous state  */
+  private boolean errorEncountered = false;
+
+  private long sleepTimeMsInCaseOfNoRequests = 1;
+
+  /***
+   * Constructs an interpreter thread instance. Note that the constructor does not start the interpreter in memory yet.
+   * @param requestQueue The queue from which requests will be processed from.
+   * @param responseQueue The queue into which the responses will be written into
+   * @param threadID An identifier for this thread name for efficient logging markers
+   */
+  public InterpreterThread(BlockingQueue<PythonRequestResponse> requestQueue,
+      BlockingQueue<PythonRequestResponse> responseQueue,String threadID)
+  {
+    this.requestQueue = requestQueue;
+    this.responseQueue = responseQueue;
+    this.threadID = threadID;
+  }
+
+  /***
+   * Loads the JEP dynamic library for the JVM to use the JNI bridge into the interpreter
+   * @throws ApexPythonInterpreterException if the library could not be loaded or located
+   */
+  private void loadMandatoryJVMLibraries() throws ApexPythonInterpreterException
+  {
+    LOG.info("Java library path being used for Interpreted ID " +  threadID + " " +
+        System.getProperty("java.library.path"));
+    try {
+      System.loadLibrary(JEP_LIBRARY_NAME);
+    } catch (Exception e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    LOG.info("JEP library loaded successfully");
+  }
+
+
+  public Jep getEngineReference() throws ApexPythonInterpreterException
+  {
+    return JEP_INSTANCE;
+  }
+
+  /***
+   * Executes the logic required before the start of the interpreter. In this case, it is just registering of the
+   * configs which are to be used when the interpreter is about to load
+   * @param preInitConfigs
+   * @throws ApexPythonInterpreterException
+   */
+  public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs)
+    throws ApexPythonInterpreterException
+  {
+    initConfigs.putAll(preInitConfigs);
+  }
+
+  /***
+   * Starts the interpreter by loading the shared libraries
+   * @throws ApexPythonInterpreterException if the interpreter could not be started
+   */
+  public void startInterpreter() throws ApexPythonInterpreterException
+  {
+    Thread.currentThread().setName(threadID);
+    Thread.currentThread().setPriority(Thread.MAX_PRIORITY); // To allow for time aware calls
+    loadMandatoryJVMLibraries();
+    JepConfig config = new JepConfig()
+        .setRedirectOutputStreams(true)
+        .setInteractive(false)
+        .setClassLoader(Thread.currentThread().getContextClassLoader()
+    );
+    if (initConfigs.containsKey(PYTHON_INCLUDE_PATHS)) {
+      List<String> includePaths = (List<String>)initConfigs.get(PYTHON_INCLUDE_PATHS);
+      if ( includePaths != null) {
+        LOG.info("Adding include path for the in-memory interpreter instance");
+        for (String anIncludePath: includePaths) {
+          config.addIncludePaths(anIncludePath);
+        }
+      }
+    }
+    if (initConfigs.containsKey(PYTHON_SHARED_LIBS)) {
+      Set<String> sharedLibs = (Set<String>)initConfigs.get(PYTHON_SHARED_LIBS);
+      if ( sharedLibs != null) {
+        config.setSharedModules(sharedLibs);
+        LOG.info("Loaded " + sharedLibs.size() + " shared libraries as config");
+      }
+    } else {
+      LOG.info(" No shared libraries loaded");
+    }
+    if (initConfigs.containsKey(IDLE_INTERPRETER_SPIN_POLICY)) {
+      spinPolicy = SpinPolicy.valueOf((String)initConfigs.get(IDLE_INTERPRETER_SPIN_POLICY));
+      LOG.debug("Configuring spin policy to be " + spinPolicy);
+    }
+    if (initConfigs.containsKey(SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS)) {
+      sleepTimeMsInCaseOfNoRequests = (Long)initConfigs.get(SLEEP_TIME_MS_IN_CASE_OF_NO_REQUESTS);
+      LOG.debug("Configuring sleep time for no requests situation to be " + sleepTimeMsInCaseOfNoRequests);
+    }
+    try {
+      LOG.info("Launching the in-memory interpreter");
+      JEP_INSTANCE = new Jep(config);
+    } catch (JepException e) {
+      LOG.error(e.getMessage(),e); // Purposefully logging as this will help in startup issues being captured inline
+      throw new ApexPythonInterpreterException(e);
+    }
+  }
+
+  /***
+   * Runs a series of interpreter commands. Note that no params can be passed from the JVM to the python interpreter
+   * space
+   * @param commands The series of commands that will be executed sequentially
+   * @return A map containing the result of execution of each of the commands. The command is the key that was
+   * passed as input and the value is a boolean whether the command was executed successfully
+   */
+  private Map<String,Boolean> runCommands(List<String> commands)
+  {
+    LOG.debug("Executing run commands");
+    Map<String,Boolean> resultsOfExecution = new HashMap<>();
+    for (String aCommand : commands) {
+      LOG.debug("Executing command " + aCommand);
+      try {
+        resultsOfExecution.put(aCommand,JEP_INSTANCE.eval(aCommand));
+      } catch (JepException e) {
+        resultsOfExecution.put(aCommand,Boolean.FALSE);
+        errorEncountered = true;
+        LOG.error("Error while running command " + aCommand, e);
+        return resultsOfExecution;
+      }
+    }
+    return resultsOfExecution;
+  }
+
+  /***
+   * Executes a method call by passing any parameters to the method call. The params are passed in the order they are
+   *  set in the list.
+   * @param nameOfGlobalMethod Name of the method to invoke
+   * @param argsToGlobalMethod Arguments to the method call. Typecasting is interpreted at runtime and hence multiple
+   *                           types can be sent as part of the parameter list
+   * @param type The class of the return parameter. Note that in some cases the return type will be the highest possible
+   *             bit size. For example addition of tow ints passed in might return a Long by the interpreter.
+   * @param <T> Represents the type of the return parameter
+   * @return The response from the method call that the python method returned
+   */
+  private <T> T executeMethodCall(String nameOfGlobalMethod, List<Object> argsToGlobalMethod, Class<T> type)
+  {
+    LOG.debug("Executing method call invocation");
+    try {
+      if ((argsToGlobalMethod != null) && (argsToGlobalMethod.size() > 0)) {
+        List<Object> paramsToPass = argsToGlobalMethod;
+        List<Object> modifiedParams = new ArrayList<>();
+        for ( Object aMethodParam: argsToGlobalMethod) {
+          if (argsToGlobalMethod.get(0) instanceof NDimensionalArray) {
+            LOG.debug(aMethodParam + " is of type NDimensional array and hence converting to JEP NDArray");
+            modifiedParams.add(((NDimensionalArray)aMethodParam).toNDArray());
+          } else {
+            modifiedParams.add(aMethodParam);
+          }
+        }
+        LOG.debug("Executing method" + nameOfGlobalMethod + " with " + modifiedParams.size() + " parameters");
+        return type.cast(JEP_INSTANCE.invoke(nameOfGlobalMethod,modifiedParams.toArray()));
+      } else {
+        LOG.debug("Executing " + argsToGlobalMethod + " with no parameters");
+        return type.cast(JEP_INSTANCE.invoke(nameOfGlobalMethod,new ArrayList<>().toArray()));
+      }
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error("Error while executing method " + nameOfGlobalMethod, e);
+    }
+    return null;
+  }
+
+  /***
+   * Executes a python script which can be located in the path
+   * @param pathToScript The path to the script
+   * @return true if the script invocation was successfull or false otherwise
+   */
+  private boolean executeScript(String pathToScript)
+  {
+    LOG.debug("Executing script at path " + pathToScript);
+    try {
+      JEP_INSTANCE.runScript(pathToScript);
+      return true;
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error(" Error while executing script " + pathToScript, e);
+    }
+    return false;
+  }
+
+  /***
+   * Evaluates a string expression by passing in any variable subsitution into the Interpreter space if required. Also
+   * handles the garbage collection of the variables passed and offers a configurable way to delete any variable created
+   *  as part of the evaluation expression.
+   * @param command The string equivalent of the command
+   * @param variableToExtract The name of the variable that would need to be extracted from the python interpreter space
+   *                          to the JVM space.
+   * @param variableSubstituionParams Key value pairs representing the variables that need to be passed into the
+   *                                  interpreter space and are part of the eval expression.
+   * @param deleteExtractedVariable if the L.H.S. of an assignment expression variable needs to be deleted. This is
+   *                                essentially the variable that is being requested to extract i.e. the second
+   *                                parameter to this method.
+   * @param expectedReturnType Class representing the expected return type
+   * @param <T> Template signature for the expected return type
+   * @return The value that is extracted from the interpreter space ( possibly created as part of the eval expression or
+   *  otherwise ). Returns null if any error
+   */
+  private <T> T eval(String command, String variableToExtract, Map<String, Object> variableSubstituionParams,
+      boolean deleteExtractedVariable,Class<T> expectedReturnType)
+  {
+    T variableToReturn = null;
+    LOG.debug("Executing eval expression " + command + " with return type : " + expectedReturnType);
+    try {
+      for (String aKey : variableSubstituionParams.keySet()) {
+        Object keyVal = variableSubstituionParams.get(aKey);
+        if (keyVal instanceof NDimensionalArray) {
+          keyVal = ((NDimensionalArray)keyVal).toNDArray();
+        }
+        JEP_INSTANCE.set(aKey, keyVal);
+      }
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error("Error while setting the params for eval expression " + command, e);
+      return null;
+    }
+    try {
+      LOG.debug("Executing the eval expression in the interpreter instance " + command);
+      JEP_INSTANCE.eval(command);
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error("Error while evaluating the expression " + command, e);
+      return null;
+    }
+    try {
+      if (variableToExtract != null) {
+        Object extractedVariable = JEP_INSTANCE.getValue(variableToExtract);
+        if (extractedVariable instanceof NDArray) {
+          LOG.debug(" Return type is a NumPy Array. Hence converting to NDimensionalArray instance");
+          NDArray ndArrayJepVal = (NDArray)extractedVariable;
+          NDimensionalArray nDimArray = new NDimensionalArray();
+          nDimArray.setData(ndArrayJepVal.getData());
+          nDimArray.setSignedFlag(ndArrayJepVal.isUnsigned());
+          int[] dimensions = ndArrayJepVal.getDimensions();
+          nDimArray.setDimensions(dimensions);
+          int lengthInOneDimension = 1;
+          for ( int i = 0; i < dimensions.length; i++) {
+            lengthInOneDimension *= dimensions[i];
+          }
+          nDimArray.setLengthOfSequentialArray(lengthInOneDimension);
+          variableToReturn = expectedReturnType.cast(nDimArray);
+        } else {
+          variableToReturn =  expectedReturnType.cast(extractedVariable);
+        }
+        if (deleteExtractedVariable) {
+          LOG.debug("Deleting the extracted variable from the Python interpreter space");
+          JEP_INSTANCE.eval(PYTHON_DEL_COMMAND + variableToExtract);
+        }
+      }
+      LOG.debug("Deleting all the variables from the python interpreter space ");
+      for (String aKey: variableSubstituionParams.keySet()) {
+        LOG.debug("Deleting " + aKey);
+        JEP_INSTANCE.eval(PYTHON_DEL_COMMAND + aKey);
+      }
+    } catch (JepException e) {
+      errorEncountered = true;
+      LOG.error("Error while evaluating delete part of expression " + command, e);
+      return null;
+    }
+    return variableToReturn;
+  }
+
+  /***
+   * Stops the interpreter as requested from the operator/main thread
+   * @throws ApexPythonInterpreterException if not able to stop the
+   */
+  public void stopInterpreter() throws ApexPythonInterpreterException
+  {
+    isStopped = true;
+    LOG.info("Attempting to close the interpreter thread");
+    try {
+      JEP_INSTANCE.close();
+    } catch (Exception e) {
+      LOG.error("Error while stopping the interpreter thread ", e);
+      throw new ApexPythonInterpreterException(e);
+    }
+    LOG.info("Interpreter closed");
+  }
+
+  /***
+   * Responsible for polling the request queue and formatting the request payload to make it compatible to the
+   *  individual processing logic of the functionalities provided by the interpreter API methods.
+   * @param <T> Java templating signature enforcement
+   * @throws ApexPythonInterpreterException if an unrecognized command is issued.
+   * @throws InterruptedException if interrupted while trying to wait for a request from request queue
+   */
+  private <T> void processCommand() throws ApexPythonInterpreterException, InterruptedException
+  {
+
+    PythonRequestResponse requestResponseHandle = requestQueue.poll(timeOutToPollFromRequestQueue,
+        timeUnitsToPollFromRequestQueue);
+    if (requestResponseHandle != null) {
+      LOG.debug("Processing command " + requestResponseHandle.getPythonInterpreterRequest().getCommandType());
+      busyFlag = true;
+      if (errorEncountered) {
+        LOG.debug("Error state detected from a previous command. Resetting state to  the previous" +
+            " state of the error");
+        try {
+          JEP_INSTANCE.eval(null);
+          errorEncountered = false;
+        } catch (JepException e) {
+          LOG.error("Error while trying to clear the state of the interpreter due to previous command" +
+              " " + e.getMessage(), e);
+        }
+      }
+      PythonInterpreterRequest<T> request =
+          requestResponseHandle.getPythonInterpreterRequest();
+      PythonInterpreterResponse<T> response =
+          requestResponseHandle.getPythonInterpreterResponse();
+      Map<String,Boolean> commandStatus = new HashMap<>(1);
+      switch (request.getCommandType()) {
+        case EVAL_COMMAND:
+          EvalCommandRequestPayload evalPayload = request.getEvalCommandRequestPayload();
+          T responseVal = eval(evalPayload.getEvalCommand(), evalPayload.getVariableNameToExtractInEvalCall(),
+              evalPayload.getParamsForEvalCommand(), evalPayload.isDeleteVariableAfterEvalCall(),
+              request.getExpectedReturnType());
+          response.setResponse(responseVal);
+          if (responseVal != null) {
+            commandStatus.put(evalPayload.getEvalCommand(),Boolean.TRUE);
+          } else {
+            commandStatus.put(evalPayload.getEvalCommand(),Boolean.FALSE);
+          }
+          response.setCommandStatus(commandStatus);
+          break;
+        case SCRIPT_COMMAND:
+          ScriptExecutionRequestPayload scriptPayload = request.getScriptExecutionRequestPayload();
+          if (executeScript(scriptPayload.getScriptName())) {
+            commandStatus.put(scriptPayload.getScriptName(),Boolean.TRUE);
+          } else {
+            commandStatus.put(scriptPayload.getScriptName(),Boolean.FALSE);
+          }
+          response.setCommandStatus(commandStatus);
+          break;
+        case METHOD_INVOCATION_COMMAND:
+          MethodCallRequestPayload requestpayload = request.getMethodCallRequest();
+          response.setResponse(executeMethodCall(
+              requestpayload.getNameOfMethod(), requestpayload.getArgs(), request.getExpectedReturnType()));
+          if (response.getResponse() == null) {
+            commandStatus.put(requestpayload.getNameOfMethod(), Boolean.FALSE);
+          } else {
+            commandStatus.put(requestpayload.getNameOfMethod(), Boolean.TRUE);
+          }
+          response.setCommandStatus(commandStatus);
+          break;
+        case GENERIC_COMMANDS:
+          response.setCommandStatus(runCommands(request.getGenericCommandsRequestPayload().getGenericCommands()));
+          break;
+        default:
+          throw new ApexPythonInterpreterException(new Exception("Unspecified Interpreter command"));
+      }
+      requestResponseHandle.setRequestCompletionTime(System.currentTimeMillis());
+      responseQueue.put(requestResponseHandle);
+      LOG.debug("Submitted the response and executed " + response.getCommandStatus().size() + " instances of command");
+    }
+    busyFlag = false;
+  }
+
+  /***
+   * Starts the interpreter as soon as the thread starts running. This is due to the limitation of JEP which stipulates
+   *  that the thread which started the interpreter can only issue subsequent calls/invocations. This is due to JNI
+   *   limitations. The thread then tries to consume from the request queue and process them. If there are no requests
+   *    present then the thread can possibly go to sleep based on the {@link SpinPolicy} configured. The spin policy
+   *     is passed in as the pre init configurations. See {@link PythonInterpreterConfig} for more details
+   */
+  @Override
+  public void run()
+  {
+    LOG.info("Starting the execution of Interpreter thread ");
+    if (JEP_INSTANCE == null) {
+      LOG.info("Initializaing the interpreter state");
+      startInterpreter();
+      LOG.info("Successfully initialized the interpreter");
+    }
+    while (!isStopped) {
+      if ( (requestQueue.isEmpty()) && (spinPolicy == SpinPolicy.SLEEP)) {
+        LOG.debug("Sleeping the current thread as there are no more requests to process from the queue");
+        try {
+          Thread.sleep(sleepTimeMsInCaseOfNoRequests);
+          continue;
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      try {
+        processCommand();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    LOG.info("Stop condition detected for this thread. Stopping the in-memory interpreter now...");
+    stopInterpreter();
+  }
+
+  public Jep getJEP_INSTANCE()
+  {
+    return JEP_INSTANCE;
+  }
+
+  public void setJEP_INSTANCE(Jep JEP_INSTANCE)
+  {
+    this.JEP_INSTANCE = JEP_INSTANCE;
+  }
+
+  public long getTimeOutToPollFromRequestQueue()
+  {
+    return timeOutToPollFromRequestQueue;
+  }
+
+  public void setTimeOutToPollFromRequestQueue(long timeOutToPollFromRequestQueue)
+  {
+    this.timeOutToPollFromRequestQueue = timeOutToPollFromRequestQueue;
+  }
+
+  public TimeUnit getTimeUnitsToPollFromRequestQueue()
+  {
+    return timeUnitsToPollFromRequestQueue;
+  }
+
+  public void setTimeUnitsToPollFromRequestQueue(TimeUnit timeUnitsToPollFromRequestQueue)
+  {
+    this.timeUnitsToPollFromRequestQueue = timeUnitsToPollFromRequestQueue;
+  }
+
+  public boolean isStopped()
+  {
+    return isStopped;
+  }
+
+  public void setStopped(boolean stopped)
+  {
+    isStopped = stopped;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getRequestQueue()
+  {
+    return requestQueue;
+  }
+
+  public void setRequestQueue(BlockingQueue<PythonRequestResponse> requestQueue)
+  {
+    this.requestQueue = requestQueue;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getResponseQueue()
+  {
+    return responseQueue;
+  }
+
+  public void setResponseQueue(BlockingQueue<PythonRequestResponse> responseQueue)
+  {
+    this.responseQueue = responseQueue;
+  }
+
+  public Map<PythonInterpreterConfig, Object> getInitConfigs()
+  {
+    return initConfigs;
+  }
+
+  public void setInitConfigs(Map<PythonInterpreterConfig, Object> initConfigs)
+  {
+    this.initConfigs = initConfigs;
+  }
+
+  public boolean isBusy()
+  {
+    boolean busyState = busyFlag;
+    if (!requestQueue.isEmpty()) { // This is required because interpreter thread goes to a 1 ms sleep to allow other
+      //  threads work when checking the queue for request availability. Hence busy state flag need not necessarily
+      // be updated in this sleep window even though if there is a pending request
+      busyState = true;
+    }
+    return busyState;
+  }
+
+  public void setBusy(boolean busy)
+  {
+    busyFlag = busy;
+  }
+
+  public SpinPolicy getSpinPolicy()
+  {
+    return spinPolicy;
+  }
+
+  public void setSpinPolicy(SpinPolicy spinPolicy)
+  {
+    this.spinPolicy = spinPolicy;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapper.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapper.java
new file mode 100644
index 0000000000..d969b19787
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapper.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+
+/***
+ * Wraps around the interpreter thread so that time bound SLAs can be implemented for python based execution
+ * This class primarily implements the time constraints by utilizing the {@link InterpreterThread} class and using
+ *  a Disruptor blocking queue for high throughput. Utilizes an executor service to implement the timing SLAs.
+ */
+public class InterpreterWrapper
+{
+  private static final Logger LOG = LoggerFactory.getLogger(InterpreterWrapper.class);
+
+  /* Reference to the interpreter thread which executes requests in memory  */
+  private transient InterpreterThread interpreterThread;
+
+  /* Spin policy to use  for the disruptor implementation */
+  private transient SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
+
+  private int bufferCapacity = 16; // Represents the number of workers and response queue sizes
+
+  private String interpreterId;
+
+  /* Represents the actual thread instance running under the Executor service  */
+  private transient Future<?> handleToJepRunner;
+
+  private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+  private transient BlockingQueue<PythonRequestResponse> requestQueue;
+  private transient BlockingQueue<PythonRequestResponse> responseQueue;
+
+  /* Represents the queue into which all of the stragglers will be pushed into by the interpreter thread */
+  private transient volatile BlockingQueue<PythonRequestResponse> delayedResponsesQueue;
+
+  /***
+   * Constructs the interpreter wrapper instance.
+   * @param interpreterId A string that can be used to represent the interpreter id that is passed onto the actual
+   *                      thread that is executing the commands
+   *
+   * @param delayedResponsesQueueRef The queue into which all of the straggler responses will end in
+   */
+  public InterpreterWrapper(String interpreterId,BlockingQueue<PythonRequestResponse> delayedResponsesQueueRef,
+      SpinPolicy spinPolicyForWaitingInRequestQueue)
+  {
+    delayedResponsesQueue = delayedResponsesQueueRef;
+    this.interpreterId = interpreterId;
+    this.cpuSpinPolicyForWaitingInBuffer = spinPolicyForWaitingInRequestQueue;
+    requestQueue = new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+    responseQueue =  new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+    interpreterThread = new InterpreterThread(requestQueue,responseQueue,interpreterId);
+  }
+
+  /**
+   * Invokes the interpreter thread pre initialization logic
+   * @param preInitConfigs A set of key value pairs that are used to initialize the actual interpreter. See constants
+   *                       defined in {@link InterpreterThread} for a list of keys available
+   * @throws ApexPythonInterpreterException if the pre-initialization logic could not be executed for whatever reasons
+   */
+  public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs) throws ApexPythonInterpreterException
+  {
+    interpreterThread.preInitInterpreter(preInitConfigs);
+  }
+
+  /***
+   * Starts the actual interpreter thread to which this class is wrapping around by using an executor service
+   * @throws ApexPythonInterpreterException
+   */
+  public void startInterpreter() throws ApexPythonInterpreterException
+  {
+    handleToJepRunner = executorService.submit(interpreterThread);
+  }
+
+  /***
+   * Builds a response object for the incoming request object.
+   * @param req Represents the incoming request for which response needs to be generated for.
+   * @param windowId The Operator window ID ( used to choose the interpreter thread while choosing to execute the
+   *                  logic from a pool of worker threads )
+   * @param requestId The request ID perhaps coming from the base python operator. Only used to optimize the right
+   *                  interpreter lookup from a pool of worker interpreter threads.
+   * @param <T> The template of the return type
+   * @return An object of type {@link PythonRequestResponse} is returned which encompasses both request and response.
+   */
+  private <T> PythonRequestResponse<T> buildRequestRespObject(PythonInterpreterRequest<T> req,
+      long windowId,long requestId)
+  {
+    PythonRequestResponse<T> requestResponse = new PythonRequestResponse();
+    requestResponse.setPythonInterpreterRequest(req);
+    PythonInterpreterResponse<T> response = new PythonInterpreterResponse<>(req.getExpectedReturnType());
+    requestResponse.setPythonInterpreterResponse(response);
+    requestResponse.setRequestStartTime(System.currentTimeMillis());
+    requestResponse.setRequestId(requestId);
+    requestResponse.setWindowId(windowId);
+    return requestResponse;
+  }
+
+  /***
+   * Handles the common logic that is common across all methods of invocation of the in-memory interpreter. Some common
+   *  logic includes draining any stragglers, matching the request to the any of the responses that arrive in the
+   *   response queue possibly due to previous requests
+   * @param requestResponse The wrapper object into which
+   * @param req The request that contains the timeout SLAs
+   * @param <T> Java templating signature
+   * @return A response to the original incoming request, null if the response did not arrive within given SLA.
+   * @throws InterruptedException if interrupted while waiting for the response queue.
+   */
+  public <T> PythonRequestResponse<T> processRequest(PythonRequestResponse requestResponse,
+      PythonInterpreterRequest<T> req) throws InterruptedException
+  {
+    List<PythonRequestResponse> drainedResults = new ArrayList<>();
+    PythonRequestResponse currentRequestWithResponse = null;
+    boolean isCurrentRequestProcessed = false;
+    long timeOutInNanos = TimeUnit.NANOSECONDS.convert(req.getTimeout(),req.getTimeUnit());
+    // drain any previous responses that were returned while the Apex operator is processing
+    responseQueue.drainTo(drainedResults);
+    LOG.debug("Draining previous request responses if any " + drainedResults.size());
+    for (PythonRequestResponse oldRequestResponse : drainedResults) {
+      delayedResponsesQueue.put(oldRequestResponse);
+    }
+    // We first set a timer to see how long it actually it took for the response to arrive.
+    // It is possible that a response arrived due to a previous request and hence this need for the timer
+    // which tracks the time for the current request.
+    long currentStart = System.nanoTime();
+    long timeLeftToCompleteProcessing = timeOutInNanos;
+    while ( (!isCurrentRequestProcessed) && ( timeLeftToCompleteProcessing > 0 )) {
+      LOG.debug("Submitting the interpreter Request with time out in nanos as " + timeOutInNanos);
+      requestQueue.put(requestResponse);
+      // ensures we are blocked till the time limit
+      currentRequestWithResponse = responseQueue.poll(timeOutInNanos, TimeUnit.NANOSECONDS);
+      timeLeftToCompleteProcessing = timeLeftToCompleteProcessing - ( System.nanoTime() - currentStart );
+      currentStart = System.nanoTime();
+      if (currentRequestWithResponse != null) {
+        if ( (requestResponse.getRequestId() == currentRequestWithResponse.getRequestId()) &&
+            (requestResponse.getWindowId() == currentRequestWithResponse.getWindowId()) ) {
+          isCurrentRequestProcessed = true;
+          break;
+        } else {
+          delayedResponsesQueue.put(currentRequestWithResponse);
+        }
+      } else {
+        LOG.debug(" Processing of request could not be completed on time");
+      }
+    }
+    if (isCurrentRequestProcessed) {
+      LOG.debug("Response could be processed within time SLA");
+      return currentRequestWithResponse;
+    } else {
+      LOG.debug("Response could not be processed within time SLA");
+      return null;
+    }
+  }
+
+  /***
+   * Implements the time based SLA over the interpreters run commands implementation. See
+   *  {@link InterpreterThread#runCommands(List)}
+   * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param request The payload of the request
+   * @return A response object with the results of the execution. Null if the request could not be processed on time.
+   * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+   *  queue
+   */
+  public PythonRequestResponse<Void> runCommands(long windowId, long requestId,
+      PythonInterpreterRequest<Void> request) throws InterruptedException
+  {
+    request.setCommandType(PythonCommandType.GENERIC_COMMANDS);
+    PythonRequestResponse requestResponse = buildRequestRespObject(request,windowId,requestId);
+    return processRequest(requestResponse,request);
+  }
+
+  /***
+   * Implements the time based SLA over the interpreters run commands implementation. See
+   *  {@link InterpreterThread#executeMethodCall(String, List, Class)}
+   * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param request The payload of the request
+   * @return A response object with the results of the execution. Null if the request could not be processed on time.
+   * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+   *  queue
+   */
+  public <T> PythonRequestResponse<T> executeMethodCall(long windowId, long requestId,
+      PythonInterpreterRequest<T> request)  throws InterruptedException
+  {
+    request.setCommandType(PythonCommandType.METHOD_INVOCATION_COMMAND);
+    PythonRequestResponse requestResponse = buildRequestRespObject(request, windowId,requestId);
+    return processRequest(requestResponse,request);
+  }
+
+  /***
+   * Implements the time based SLA over the interpreters run commands implementation. See
+   *  {@link InterpreterThread#executeScript(String)}
+   * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param request The payload of the request
+   * @return A response object with the results of the execution. Null if the request could not be processed on time.
+   * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+   *  queue
+   */
+  public PythonRequestResponse<Void> executeScript(long windowId,long requestId,PythonInterpreterRequest<Void> request)
+      throws InterruptedException
+  {
+    request.setCommandType(PythonCommandType.SCRIPT_COMMAND);
+    PythonRequestResponse<Void> requestResponse = buildRequestRespObject(request, windowId,requestId);
+    return processRequest(requestResponse,request);
+  }
+
+
+  /***
+   * Implements the time based SLA over the interpreters run commands implementation. See
+   *  {@link InterpreterThread#eval(String, String, Map, boolean, Class)}
+   * @param windowId The window ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param requestId The request ID as provided by the Apex operator. Used for selecting a worker from the worker pool.
+   * @param request The payload of the request
+   * @return A response object with the results of the execution. Null if the request could not be processed on time.
+   * @throws InterruptedException if interrupted while processing the wait for request or writing to delayed response
+   *  queue
+   */
+  public <T> PythonRequestResponse<T> eval(long windowId, long requestId,PythonInterpreterRequest<T> request)
+      throws InterruptedException
+  {
+    request.setCommandType(PythonCommandType.EVAL_COMMAND);
+    PythonRequestResponse<T> requestResponse = buildRequestRespObject(request,windowId,requestId);
+    return processRequest(requestResponse,request);
+  }
+
+  /***
+   * Stops the interpreter
+   * @throws ApexPythonInterpreterException if error while stopping the interpreter
+   */
+  public void stopInterpreter() throws ApexPythonInterpreterException
+  {
+    interpreterThread.setStopped(true);
+    handleToJepRunner.cancel(false);
+    executorService.shutdown();
+  }
+
+  public InterpreterThread getInterpreterThread()
+  {
+    return interpreterThread;
+  }
+
+  public void setInterpreterThread(InterpreterThread interpreterThread)
+  {
+    this.interpreterThread = interpreterThread;
+  }
+
+  public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
+  {
+    return cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
+  {
+    this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public int getBufferCapacity()
+  {
+    return bufferCapacity;
+  }
+
+  public void setBufferCapacity(int bufferCapacity)
+  {
+    this.bufferCapacity = bufferCapacity;
+  }
+
+  public String getInterpreterId()
+  {
+    return interpreterId;
+  }
+
+  public void setInterpreterId(String interpreterId)
+  {
+    this.interpreterId = interpreterId;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getRequestQueue()
+  {
+    return requestQueue;
+  }
+
+  public void setRequestQueue(BlockingQueue<PythonRequestResponse> requestQueue)
+  {
+    this.requestQueue = requestQueue;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getResponseQueue()
+  {
+    return responseQueue;
+  }
+
+  public void setResponseQueue(BlockingQueue<PythonRequestResponse> responseQueue)
+  {
+    this.responseQueue = responseQueue;
+  }
+
+  public BlockingQueue<PythonRequestResponse> getDelayedResponsesQueue()
+  {
+    return delayedResponsesQueue;
+  }
+
+  public void setDelayedResponsesQueue(BlockingQueue<PythonRequestResponse> delayedResponsesQueue)
+  {
+    this.delayedResponsesQueue = delayedResponsesQueue;
+  }
+
+  public boolean isCurrentlyBusy()
+  {
+    return interpreterThread.isBusy();
+  }
+
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/JepPythonEngine.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/JepPythonEngine.java
new file mode 100644
index 0000000000..b57504290a
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/JepPythonEngine.java
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonEngine;
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+import com.google.common.primitives.Ints;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/***
+ * <p>Implements the {@link ApexPythonEngine} interface by using the JEP ( Java Embedded Python ) engine. It is an
+ *  in-memory interpreter and has the following characteristics:
+ *  <ol>
+ *    <li>The python engine allows for 4 major API patterns
+ *    <ul>
+ *      <li>Execute a method call by accepting parameters to pass to the interpreter</li>
+ *      <li>Execute a python script as given in a file path</li>
+ *      <li>Evaluate an expression and allows for passing of variables between the java code and the python
+ *      in-memory interpreter bridge</li>
+ *      <li>A handy method wherein a series of instructions can be passed in one single java call ( executed as a
+ *       sequence of python eval instructions under the hood ) </li>
+ *    </ul>
+ *    <li>Automatic garbage collection of the variables that are passed from java code to the in memory python
+ *     interpreter</li>
+ *    <li>Support for all major python libraries. Tensorflow, Keras, Scikit, XGBoost</li>
+ *    <li>The python engine uses the concept of a worker thread that is responsible for executing any of the 4
+ *     patterns mentioned above. The worker thread is implemented by {@link InterpreterThread}</li>
+ *    <li>The implementation allows for SLA based execution model. i.e. the caller can stipulate that if the call is not
+ *     complete within a time out, the engine code returns back null. See {@link InterpreterWrapper}</li>
+ *    <li>Supports the concept of stragglers i.e. the processing of a request can complete eventually and the
+ *     result available from a queue called as the delayed response queue</li>
+ *    <li>Supports the concept of executing a call on all of the worker threads if required. This is to ensure the
+ *     following use cases:
+ *      <ul>
+ *        <li>Since this is an interpreter, the users can make use of an earlier calls variable definition if
+ *         need be. In such cases, the caller will have the need for a sticky thread i.e. all such calls need to
+ *          end up on the same thread.</li>
+ *        <li>Another reason is to implement the concept of Dynamic partitioning. Since interpreter accumulates
+ *         state due to commands run on it, if a new partition is introduced at runtime, this can failures for all
+ *          subsequent commands as they might depend on variables created in previous windows</li>
+ *      </ul>
+ *    </li>
+ *  </li>
+ *  </ol>
+ * </p>
+ *
+ * <p> Note that the engine implementation can be used independent of an Operator i.e. as a utility stack if need be.
+ *  Some of the API signatures need a window ID and request ID but they do not necessarily mean that the API
+ *   signatures are bound to an operator lifecycle. These parameters are used for efficient thread usage only and
+ *    the API only needs a monotonically increasing number in true sense.
+ * </p>
+ *
+ * <p>
+ *   JEP needs to be installed on all of the YARN nodes prior to the use of the JEP engine until docker support is
+ *    available for Apex. Virtual environments are not supported yet. If multiple versions of python are present
+ *     on the YARN nodes, ensure  the JVM option java.library.path is pointing to the right version of JEP which in
+ *      turn will ensure the right version of python to be used at runtime.
+ * </p>
+ *
+ * <p>
+ *   JEPPythonEngine works on the concept of a worker pool. The engine maintains a configurable number of workers and
+ *    each worker has a dedicated request and response queue. While this class is responsible for choosing the
+ *    right worker from the pool of workers for a given request , the {@link InterpreterWrapper} class is responsible
+ *     for maintaining the time bound SLAs.
+ * </p>
+ *
+ */
+public class JepPythonEngine implements ApexPythonEngine
+{
+  private static final Logger LOG = LoggerFactory.getLogger(JepPythonEngine.class);
+
+  /* Size of the worker pool */
+  private int numWorkerThreads = 3;
+
+  /* A name that can be used while logging messages and also used to set thread names */
+  private String threadGroupName;
+
+  private static final String JEP_LIBRARY_NAME = "jep";
+
+  private transient List<PythonRequestResponse> commandHistory = new ArrayList<>();
+
+  /* Spin policy for the disruptor queue implementation */
+  private transient SpinPolicy cpuSpinPolicyForWaitingInBuffer = SpinPolicy.WAITING;
+
+  // Represents the number of responses that can be held in the queue
+  private int bufferCapacity = 64;
+
+  /* Time used to sleep in the beginning of the interpreter threads run i.e. start  while initializing the interpreter.
+  Note that booting of the memory interpreter can be a really heavy process depending on the libraries that
+   are being loaded and hence this variable */
+  private long sleepTimeAfterInterpreterStart = 3000; // 3 secs
+
+  /**
+   * Represents the queue into which all the stragglers are drained into
+   */
+  private transient BlockingQueue<PythonRequestResponse> delayedResponseQueue =
+      new DisruptorBlockingQueue<>(bufferCapacity,cpuSpinPolicyForWaitingInBuffer);
+
+  private List<InterpreterWrapper> workers = new ArrayList<>();
+
+  private Map<PythonInterpreterConfig, Object> preInitConfigs;
+
+  private long numStarvedReturns = 0;
+
+  /***
+   * Created the JEP Python engine instance but does not start the interpreters yet
+   * @param threadGroupName A name that represents all the workers in this thread
+   * @param numWorkerThreads Number of workers in the work pool
+   */
+  public JepPythonEngine(String threadGroupName,int numWorkerThreads)
+  {
+    this.numWorkerThreads = numWorkerThreads;
+    this.threadGroupName = threadGroupName;
+  }
+
+  private void initWorkers() throws ApexPythonInterpreterException
+  {
+    LOG.info("Attempting to load the JEP dynamic library");
+    System.loadLibrary(JEP_LIBRARY_NAME);
+    LOG.info("Successfully loaded the JEP dynamic library in memory");
+    SpinPolicy spinPolicyForReqQueue = SpinPolicy.WAITING;
+    if (preInitConfigs.containsKey(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY)) {
+      spinPolicyForReqQueue = (SpinPolicy)preInitConfigs.get(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY);
+    }
+    for ( int i = 0; i < numWorkerThreads; i++) {
+      InterpreterWrapper aWorker = new InterpreterWrapper(threadGroupName + "-" + i,delayedResponseQueue,
+          spinPolicyForReqQueue);
+      aWorker.preInitInterpreter(preInitConfigs);
+      aWorker.startInterpreter();
+      workers.add(aWorker);
+    }
+  }
+
+  /***
+   * Used to select the right worker from the work pool. The goal is to round robin the workers as far as possible.
+   *  Factors like busy workers can mean that the next available worker is chosen
+   * @param requestId Used to round robin the requests. Need not necessarily mean only an operator can use this engine.
+   * @return A worker from the worker pool. Null if all workers are busy.
+   */
+  protected InterpreterWrapper selectWorkerForCurrentCall(long requestId)
+  {
+    int slotToLookFor = Ints.saturatedCast(requestId) % numWorkerThreads;
+    LOG.debug("Slot that is being looked for in the worker pool " + slotToLookFor);
+    boolean isWorkerFound = false;
+    int numWorkersScannedForAvailability = 0;
+    InterpreterWrapper aWorker = null;
+    while ( (!isWorkerFound) && (numWorkersScannedForAvailability < numWorkerThreads)) {
+      aWorker = workers.get(slotToLookFor);
+      numWorkersScannedForAvailability  = numWorkersScannedForAvailability + 1;
+      if (!aWorker.isCurrentlyBusy()) {
+        isWorkerFound = true;
+        LOG.debug("Found worker with index as  " + slotToLookFor);
+        break;
+      } else {
+        LOG.debug("Thread ID is currently busy " + aWorker.getInterpreterId());
+        slotToLookFor = slotToLookFor + 1;
+        if ( slotToLookFor == numWorkerThreads) {
+          slotToLookFor = 0;
+        }
+      }
+    }
+    if (isWorkerFound) {
+      return aWorker;
+    } else {
+      numStarvedReturns += 1;
+      return null;
+    }
+  }
+
+  /***
+   * See {@link ApexPythonEngine#preInitInterpreter(Map)} for more details
+   * @param preInitConfigs The configuration that is going to be used by the interpreter.See constants
+   *                       defined in {@link PythonInterpreterConfig} for a list of keys available
+   * @throws ApexPythonInterpreterException if an issue while executing the pre interpreter logic
+   */
+  @Override
+  public void preInitInterpreter(Map<PythonInterpreterConfig, Object> preInitConfigs)
+    throws ApexPythonInterpreterException
+  {
+    this.preInitConfigs = preInitConfigs;
+  }
+
+  /***
+   * Starts all of the worker threads. Also sleeps for a few moments to ensure "fat" frameworks like Tensorflow can
+   *  be allowed to boot completely.
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public void startInterpreter() throws ApexPythonInterpreterException
+  {
+    initWorkers();
+    try {
+      if (sleepTimeAfterInterpreterStart > 0) {
+        LOG.debug("Sleeping to let the interpreter boot up in memory");
+        Thread.sleep(sleepTimeAfterInterpreterStart);
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /***
+   * Used to execute all of the commands from the command history when an operator is instantiating a new instance of
+   *  the engine. Used by the dynamic partitioner to let a newly provisioned operator to catch up to the state of all of
+   *  the remaining operator instances
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public void postStartInterpreter() throws ApexPythonInterpreterException
+  {
+    for ( InterpreterWrapper wrapper : workers) {
+      for (PythonRequestResponse requestResponse : commandHistory) {
+        PythonInterpreterRequest requestPayload = requestResponse.getPythonInterpreterRequest();
+        try {
+          wrapper.processRequest(requestResponse,requestPayload);
+        } catch (InterruptedException e) {
+          throw new ApexPythonInterpreterException(e);
+        }
+      }
+    }
+  }
+
+  /***
+   * See {@link ApexPythonEngine#runCommands(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+   *  details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+   *  set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+   *   number of worker threads
+   * @param executionMode Whether these commands need to be run on all worker nodes or any of the worker node
+   * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+   * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+   * @param request Represents the request to be processed.
+   * @return A map containing the command as key and boolean representing success or failure as the value.
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public Map<String,PythonRequestResponse<Void>> runCommands(WorkerExecutionMode executionMode,long windowId,
+      long requestId, PythonInterpreterRequest<Void> request) throws ApexPythonInterpreterException
+  {
+    checkNotNullConditions(request);
+    checkNotNull(request.getGenericCommandsRequestPayload(), "Run commands payload not set");
+    checkNotNull(request.getGenericCommandsRequestPayload().getGenericCommands(),
+        "Commands that need to be run not set");
+    Map<String,PythonRequestResponse<Void>> returnStatus = new HashMap<>();
+    PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+    try {
+      if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+        LOG.debug("Executing run commands on all of the interpreter worker threads");
+        long  timeOutPerWorker = TimeUnit.NANOSECONDS.convert(request.getTimeout(),request.getTimeUnit()) /
+            numWorkerThreads;
+        LOG.debug("Allocating " + timeOutPerWorker + " nanoseconds for each of the worker threads");
+        if ( timeOutPerWorker == 0) {
+          timeOutPerWorker = 1;
+        }
+        request.setTimeout(timeOutPerWorker);
+        request.setTimeUnit(TimeUnit.NANOSECONDS);
+        for ( InterpreterWrapper wrapper : workers) {
+          lastSuccessfullySubmittedRequest = wrapper.runCommands(windowId,requestId,request);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        }
+        if ( returnStatus.size() > 0) {
+          commandHistory.add(lastSuccessfullySubmittedRequest);
+        }
+      } else {
+        InterpreterWrapper currentThread = null;
+        if (executionMode.equals(WorkerExecutionMode.ANY)) {
+          LOG.debug("Executing run commands on a single interpreter worker thread");
+          currentThread = selectWorkerForCurrentCall(requestId);
+        }
+        if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+          currentThread = workers.get(request.hashCode() % numWorkerThreads);
+          LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+        }
+        if (currentThread != null) {
+          lastSuccessfullySubmittedRequest = currentThread.runCommands(windowId, requestId, request);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        } else {
+          throw new ApexPythonInterpreterException("No free interpreter threads available." +
+            " Consider increasing workers and relaunch");
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    return returnStatus;
+  }
+
+  /***
+   *  See {@link ApexPythonEngine#executeMethodCall(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+   *  details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+   *  set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+   *   number of worker threads
+   *
+   * @param executionMode If the method call needs to be invoked on all workers or any single worker
+   * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+   * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+   * @param req Represents the request to be processed.
+   * @param <T>
+   * @return
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public <T> Map<String,PythonRequestResponse<T>> executeMethodCall(WorkerExecutionMode executionMode,long windowId,
+      long requestId, PythonInterpreterRequest<T> req) throws ApexPythonInterpreterException
+  {
+    checkNotNullConditions(req);
+    checkNotNull(req.getMethodCallRequest(), "Method call info not set");
+    checkNotNull(req.getMethodCallRequest().getNameOfMethod(), "Method name not set");
+    Map<String,PythonRequestResponse<T>> returnStatus = new HashMap<>();
+    req.setCommandType(PythonCommandType.METHOD_INVOCATION_COMMAND);
+    PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+    try {
+      if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+        long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(req.getTimeout(), req.getTimeUnit()) /
+            numWorkerThreads;
+        if ( timeOutPerWorker == 0) {
+          timeOutPerWorker = 1;
+        }
+        req.setTimeout(timeOutPerWorker);
+        req.setTimeUnit(TimeUnit.NANOSECONDS);
+        for ( InterpreterWrapper wrapper : workers) {
+          lastSuccessfullySubmittedRequest = wrapper.executeMethodCall(windowId,requestId,req);
+          if ( lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        }
+        if ( returnStatus.size() > 0) {
+          commandHistory.add(lastSuccessfullySubmittedRequest);
+        }
+      } else {
+        InterpreterWrapper currentThread = null;
+        if (executionMode.equals(WorkerExecutionMode.ANY)) {
+          currentThread = selectWorkerForCurrentCall(requestId);
+        }
+        if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+          currentThread = workers.get(req.hashCode() % numWorkerThreads);
+          LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+        }
+        if (currentThread != null) {
+          lastSuccessfullySubmittedRequest = currentThread.executeMethodCall(windowId, requestId, req);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          } else {
+            throw new ApexPythonInterpreterException("No free interpreter threads available." +
+              " Consider increasing workers and relaunch");
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    return returnStatus;
+  }
+
+  /***
+   *   See {@link ApexPythonEngine#executeScript(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+   *  details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+   *  set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+   *   number of worker threads
+   * @param executionMode  If the method call needs to be invoked on all workers or any single worker
+   * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+   * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+   * @param req Represents the request to be processed.
+   * @return
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public Map<String,PythonRequestResponse<Void>> executeScript(WorkerExecutionMode executionMode,long windowId,
+      long requestId, PythonInterpreterRequest<Void> req)
+    throws ApexPythonInterpreterException
+  {
+    checkNotNullConditions(req);
+    checkNotNull(req.getScriptExecutionRequestPayload(), "Script execution info not set");
+    checkNotNull(req.getScriptExecutionRequestPayload().getScriptName(), "Script name not set");
+    Map<String,PythonRequestResponse<Void>> returnStatus = new HashMap<>();
+    PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+    try {
+      if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+        long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(req.getTimeout(), req.getTimeUnit()) /
+            numWorkerThreads;
+        if ( timeOutPerWorker == 0) {
+          timeOutPerWorker = 1;
+        }
+        req.setTimeout(timeOutPerWorker);
+        req.setTimeUnit(TimeUnit.NANOSECONDS);
+        for ( InterpreterWrapper wrapper : workers) {
+          lastSuccessfullySubmittedRequest = wrapper.executeScript(windowId,requestId,req);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(wrapper.getInterpreterId(),lastSuccessfullySubmittedRequest);
+          }
+        }
+        if ( returnStatus.size() > 0) {
+          commandHistory.add(lastSuccessfullySubmittedRequest);
+        }
+      } else {
+        InterpreterWrapper currentThread = null;
+        if (executionMode.equals(WorkerExecutionMode.ANY)) {
+          currentThread = selectWorkerForCurrentCall(requestId);
+        }
+        if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+          currentThread = workers.get(req.hashCode() % numWorkerThreads);
+          LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+        }
+        if (currentThread != null) {
+          lastSuccessfullySubmittedRequest = currentThread.executeScript(windowId, requestId, req);
+          if (lastSuccessfullySubmittedRequest != null) {
+            returnStatus.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        } else {
+          throw new ApexPythonInterpreterException("No free interpreter threads available." +
+            " Consider increasing workers and relaunch");
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    return returnStatus;
+  }
+
+  private void checkNotNullConditions(PythonInterpreterRequest request)
+  {
+    checkNotNull(request, "Request object cannnot be null");
+    checkNotNull(request.getTimeout(), "Time out value not set");
+    checkNotNull(request.getTimeUnit(), "Time out unit not set");
+  }
+
+  /***
+   *  See {@link ApexPythonEngine#eval(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for more
+   *  details. Note that if the worker execution mode {@link WorkerExecutionMode} is BROADCAST, then the time SLA
+   *  set is the total time for all workers i.e. each worker is given a ( total time / N ) where N is the current
+   *   number of worker threads
+   * @param executionMode If the method call needs to be invoked on all workers or any single worker
+   * @param windowId used to select the worker from the worker pool.Can be any long if an operator is not using this.
+   * @param requestId used to select the worker from the worker pool. Can be any long if an operator is not using this.
+   * @param request
+   * @param <T>
+   * @return
+   * @throws ApexPythonInterpreterException
+   */
+  @Override
+  public <T> Map<String,PythonRequestResponse<T>> eval(WorkerExecutionMode executionMode,long windowId, long requestId,
+      PythonInterpreterRequest<T> request) throws ApexPythonInterpreterException
+  {
+    checkNotNullConditions(request);
+    checkNotNull(request.getEvalCommandRequestPayload(), "Eval command info not set");
+    checkNotNull(request.getEvalCommandRequestPayload().getEvalCommand(),"Eval command not set");
+    Map<String,PythonRequestResponse<T>> statusOfEval = new HashMap<>();
+    PythonRequestResponse lastSuccessfullySubmittedRequest = null;
+    try {
+      if (executionMode.equals(WorkerExecutionMode.BROADCAST)) {
+        long timeOutPerWorker = TimeUnit.NANOSECONDS.convert(request.getTimeout(), request.getTimeUnit()) /
+            numWorkerThreads;
+        if ( timeOutPerWorker == 0) {
+          timeOutPerWorker = 1;
+        }
+        request.setTimeout(timeOutPerWorker);
+        request.setTimeUnit(TimeUnit.NANOSECONDS);
+        for ( InterpreterWrapper wrapper : workers) {
+          lastSuccessfullySubmittedRequest = wrapper.eval(windowId,requestId,request);
+          if (lastSuccessfullySubmittedRequest != null) {
+            statusOfEval.put(wrapper.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        }
+        commandHistory.add(lastSuccessfullySubmittedRequest);
+      } else {
+        InterpreterWrapper currentThread = null;
+        if (executionMode.equals(WorkerExecutionMode.ANY)) {
+          currentThread = selectWorkerForCurrentCall(requestId);
+        }
+        if (executionMode.equals(WorkerExecutionMode.STICKY)) {
+          currentThread = workers.get(request.hashCode() % numWorkerThreads);
+          LOG.debug(" Choosing sticky worker " + currentThread.getInterpreterId());
+        }
+        if (currentThread != null) {
+          lastSuccessfullySubmittedRequest = currentThread.eval(windowId, requestId, request);
+          if (lastSuccessfullySubmittedRequest != null) {
+            statusOfEval.put(currentThread.getInterpreterId(), lastSuccessfullySubmittedRequest);
+          }
+        } else {
+          throw new ApexPythonInterpreterException("No free interpreter threads available." +
+            " Consider increasing workers and relaunch");
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new ApexPythonInterpreterException(e);
+    }
+    return statusOfEval;
+  }
+
+  @Override
+  public void stopInterpreter() throws ApexPythonInterpreterException
+  {
+    for ( InterpreterWrapper wrapper : workers) {
+      wrapper.stopInterpreter();
+    }
+  }
+
+  public int getNumWorkerThreads()
+  {
+    return numWorkerThreads;
+  }
+
+  public void setNumWorkerThreads(int numWorkerThreads)
+  {
+    this.numWorkerThreads = numWorkerThreads;
+  }
+
+  public List<InterpreterWrapper> getWorkers()
+  {
+    return workers;
+  }
+
+  public void setWorkers(List<InterpreterWrapper> workers)
+  {
+    this.workers = workers;
+  }
+
+  @Override
+  public List<PythonRequestResponse> getCommandHistory()
+  {
+    return commandHistory;
+  }
+
+  @Override
+  public void setCommandHistory(List<PythonRequestResponse> commandHistory)
+  {
+    this.commandHistory = commandHistory;
+  }
+
+  public long getSleepTimeAfterInterpreterStart()
+  {
+    return sleepTimeAfterInterpreterStart;
+  }
+
+  public void setSleepTimeAfterInterpreterStart(long sleepTimeAfterInterpreterStart)
+  {
+    this.sleepTimeAfterInterpreterStart = sleepTimeAfterInterpreterStart;
+  }
+
+  @Override
+  public BlockingQueue<PythonRequestResponse> getDelayedResponseQueue()
+  {
+    return delayedResponseQueue;
+  }
+
+  @Override
+  public void setDelayedResponseQueue(BlockingQueue<PythonRequestResponse> delayedResponseQueue)
+  {
+    this.delayedResponseQueue = delayedResponseQueue;
+  }
+
+  public SpinPolicy getCpuSpinPolicyForWaitingInBuffer()
+  {
+    return cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public void setCpuSpinPolicyForWaitingInBuffer(SpinPolicy cpuSpinPolicyForWaitingInBuffer)
+  {
+    this.cpuSpinPolicyForWaitingInBuffer = cpuSpinPolicyForWaitingInBuffer;
+  }
+
+  public int getBufferCapacity()
+  {
+    return bufferCapacity;
+  }
+
+  public void setBufferCapacity(int bufferCapacity)
+  {
+    this.bufferCapacity = bufferCapacity;
+  }
+
+  @Override
+  public long getNumStarvedReturns()
+  {
+    return numStarvedReturns;
+  }
+
+  @Override
+  public void setNumStarvedReturns(long numStarvedReturns)
+  {
+    this.numStarvedReturns = numStarvedReturns;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/SpinPolicy.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/SpinPolicy.java
new file mode 100644
index 0000000000..dd75956340
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/SpinPolicy.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+public enum SpinPolicy
+{
+  SLEEP,
+  BUSY_SPIN
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/jep/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/jep/package-info.java
new file mode 100644
index 0000000000..9b05d733ef
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/jep/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base.jep;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/package-info.java
new file mode 100644
index 0000000000..ba84249ae4
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/AbstractPythonExecutionPartitioner.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/AbstractPythonExecutionPartitioner.java
new file mode 100644
index 0000000000..6312b2911d
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/AbstractPythonExecutionPartitioner.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.partitioner;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.util.KryoCloneUtils;
+import org.apache.apex.malhar.python.base.BasePythonExecutionOperator;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+
+/***
+ * Abstract partitioner that can be used in partitioning instances of the BasePythonExecution operator. This
+ *  class does not do anything meaningful. See {@link ThreadStarvationBasedPartitioner} for details.
+ */
+public abstract class AbstractPythonExecutionPartitioner implements Partitioner<BasePythonExecutionOperator>
+{
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractPythonExecutionPartitioner.class);
+
+  @JsonIgnore
+  protected BasePythonExecutionOperator prototypePythonOperator;
+
+  public AbstractPythonExecutionPartitioner(BasePythonExecutionOperator prototypePythonOperator)
+  {
+    this.prototypePythonOperator = prototypePythonOperator;
+  }
+
+  @Override
+  public Collection<Partition<BasePythonExecutionOperator>> definePartitions(
+      Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context)
+  {
+    List<Partition<BasePythonExecutionOperator>> requiredPartitions = buildTargetPartitions(partitions, context);
+    return requiredPartitions;
+  }
+
+  protected abstract List<Partition<BasePythonExecutionOperator>> buildTargetPartitions(
+      Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context);
+
+  @Override
+  public void partitioned(Map<Integer, Partition<BasePythonExecutionOperator>> partitions)
+  {
+
+  }
+
+  public Partitioner.Partition<BasePythonExecutionOperator> clonePartition()
+  {
+    Partitioner.Partition<BasePythonExecutionOperator> clonedKuduInputOperator =
+        new DefaultPartition<>(KryoCloneUtils.cloneObject(prototypePythonOperator));
+    return clonedKuduInputOperator;
+  }
+}
+
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/PythonExecutionPartitionerType.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/PythonExecutionPartitionerType.java
new file mode 100644
index 0000000000..3acb32c62d
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/PythonExecutionPartitionerType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.partitioner;
+
+public enum PythonExecutionPartitionerType
+{
+  THREAD_STARVATION_BASED;
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/ThreadStarvationBasedPartitioner.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/ThreadStarvationBasedPartitioner.java
new file mode 100644
index 0000000000..f93ead931f
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/ThreadStarvationBasedPartitioner.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.partitioner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.BasePythonExecutionOperator;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+
+/***
+ * A partitioner that adds another instance of the python operator when the percent of non-serviced requests within any
+ *  two checkpoint boundaries exceed a certain threshold. The threshold for triggering is configurable and is expressed
+ *   as a percentage.
+ */
+public class ThreadStarvationBasedPartitioner extends AbstractPythonExecutionPartitioner
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ThreadStarvationBasedPartitioner.class);
+
+  private float threadStarvationThresholdRatio;
+
+  public ThreadStarvationBasedPartitioner(BasePythonExecutionOperator prototypePythonOperator)
+  {
+    super(prototypePythonOperator);
+  }
+
+  /***
+   * Calculates the partitions that are required based on the starvations encountered for each checkpoint state. The
+   *  new instance is fed with the command history of the original operator ( if any ) so that the new instance can
+   *   be in the same state of the original operator when it starts processing the new tuples.
+   * @param partitions The current set of partitions
+   * @param context The partitioning context
+   * @return The new set of partitioned instances keeping the old ones in tact and rebuilding only new ones if needed.
+   */
+  @Override
+  protected List<Partition<BasePythonExecutionOperator>> buildTargetPartitions(
+      Collection<Partition<BasePythonExecutionOperator>> partitions, PartitioningContext context)
+  {
+    List<Partition<BasePythonExecutionOperator>> returnList = new ArrayList<>();
+    if (partitions != null) {
+      returnList.addAll(partitions);
+      for (Partition<BasePythonExecutionOperator> aCurrentPartition : partitions) {
+        BasePythonExecutionOperator anOperator = aCurrentPartition.getPartitionedInstance();
+        long starvedCount = anOperator.getNumStarvedReturns();
+        long requestsForCheckpointWindow = anOperator.getNumberOfRequestsProcessedPerCheckpoint();
+        if ( requestsForCheckpointWindow != 0) { // when the operator is starting for the first time
+          float starvationPercent = 100 - ( ((requestsForCheckpointWindow - starvedCount ) /
+              requestsForCheckpointWindow) * 100);
+          if (starvationPercent > anOperator.getStarvationPercentBeforeSpawningNewInstance()) {
+            LOG.info("Creating a new instance of the python operator as starvation % is " + starvationPercent);
+            Partition<BasePythonExecutionOperator> newInstance = clonePartition();
+            List<PythonRequestResponse> commandHistory = new ArrayList<>();
+            commandHistory.addAll(anOperator.getAccumulatedCommandHistory());
+            newInstance.getPartitionedInstance().setAccumulatedCommandHistory(commandHistory);
+            returnList.add(newInstance);
+          }
+        }
+      }
+    }
+    return returnList;
+  }
+
+  public float getThreadStarvationThresholdRatio()
+  {
+    return threadStarvationThresholdRatio;
+  }
+
+  public void setThreadStarvationThresholdRatio(float threadStarvationThresholdRatio)
+  {
+    this.threadStarvationThresholdRatio = threadStarvationThresholdRatio;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/package-info.java
new file mode 100644
index 0000000000..94b3e3720b
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/partitioner/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base.partitioner;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/EvalCommandRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/EvalCommandRequestPayload.java
new file mode 100644
index 0000000000..98aaff55d5
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/EvalCommandRequestPayload.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.Map;
+
+public class EvalCommandRequestPayload
+{
+  private boolean deleteVariableAfterEvalCall;
+
+  private String variableNameToExtractInEvalCall;
+
+  private String evalCommand;
+
+  private Map<String,Object> paramsForEvalCommand;
+
+  public boolean isDeleteVariableAfterEvalCall()
+  {
+    return deleteVariableAfterEvalCall;
+  }
+
+  public void setDeleteVariableAfterEvalCall(boolean deleteVariableAfterEvalCall)
+  {
+    this.deleteVariableAfterEvalCall = deleteVariableAfterEvalCall;
+  }
+
+  public String getVariableNameToExtractInEvalCall()
+  {
+    return variableNameToExtractInEvalCall;
+  }
+
+  public void setVariableNameToExtractInEvalCall(String variableNameToExtractInEvalCall)
+  {
+    this.variableNameToExtractInEvalCall = variableNameToExtractInEvalCall;
+  }
+
+  public String getEvalCommand()
+  {
+    return evalCommand;
+  }
+
+  public void setEvalCommand(String evalCommand)
+  {
+    this.evalCommand = evalCommand;
+  }
+
+  public Map<String, Object> getParamsForEvalCommand()
+  {
+    return paramsForEvalCommand;
+  }
+
+  public void setParamsForEvalCommand(Map<String, Object> paramsForEvalCommand)
+  {
+    this.paramsForEvalCommand = paramsForEvalCommand;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/GenericCommandsRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/GenericCommandsRequestPayload.java
new file mode 100644
index 0000000000..b037248250
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/GenericCommandsRequestPayload.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.List;
+
+public class GenericCommandsRequestPayload
+{
+  List<String> genericCommands;
+
+  public List<String> getGenericCommands()
+  {
+    return genericCommands;
+  }
+
+  public void setGenericCommands(List<String> genericCommands)
+  {
+    this.genericCommands = genericCommands;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/MethodCallRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/MethodCallRequestPayload.java
new file mode 100644
index 0000000000..368cff9bd7
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/MethodCallRequestPayload.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.List;
+
+public class MethodCallRequestPayload
+{
+  private String nameOfMethod;
+
+  private List<Object> args;
+
+  public String getNameOfMethod()
+  {
+    return nameOfMethod;
+  }
+
+  public void setNameOfMethod(String nameOfMethod)
+  {
+    this.nameOfMethod = nameOfMethod;
+  }
+
+  public List<Object> getArgs()
+  {
+    return args;
+  }
+
+  public void setArgs(List<Object> args)
+  {
+    this.args = args;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonCommandType.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonCommandType.java
new file mode 100644
index 0000000000..3721b84940
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonCommandType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+public enum PythonCommandType
+{
+  GENERIC_COMMANDS,
+  EVAL_COMMAND,
+  SCRIPT_COMMAND,
+  METHOD_INVOCATION_COMMAND
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterRequest.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterRequest.java
new file mode 100644
index 0000000000..1b8a46fad0
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterRequest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.concurrent.TimeUnit;
+
+public class PythonInterpreterRequest<T>
+{
+  PythonCommandType commandType;
+
+  long timeout;
+
+  TimeUnit timeUnit;
+
+  MethodCallRequestPayload methodCallRequest;
+
+  GenericCommandsRequestPayload genericCommandsRequestPayload;
+
+  EvalCommandRequestPayload evalCommandRequestPayload;
+
+  ScriptExecutionRequestPayload scriptExecutionRequestPayload;
+
+  Class<T> expectedReturnType;
+
+  // This constructor is not to be used by the user code and only exists for Kryo serialization
+  public PythonInterpreterRequest()
+  {
+  }
+
+  public PythonInterpreterRequest(Class<T> expectedReturnType)
+  {
+    this.expectedReturnType = expectedReturnType;
+  }
+
+  public PythonCommandType getCommandType()
+  {
+    return commandType;
+  }
+
+  public void setCommandType(PythonCommandType commandType)
+  {
+    this.commandType = commandType;
+  }
+
+  public MethodCallRequestPayload getMethodCallRequest()
+  {
+    return methodCallRequest;
+  }
+
+  public void setMethodCallRequest(MethodCallRequestPayload methodCallRequest)
+  {
+    this.methodCallRequest = methodCallRequest;
+  }
+
+  public GenericCommandsRequestPayload getGenericCommandsRequestPayload()
+  {
+    return genericCommandsRequestPayload;
+  }
+
+  public void setGenericCommandsRequestPayload(GenericCommandsRequestPayload genericCommandsRequestPayload)
+  {
+    this.genericCommandsRequestPayload = genericCommandsRequestPayload;
+  }
+
+  public EvalCommandRequestPayload getEvalCommandRequestPayload()
+  {
+    return evalCommandRequestPayload;
+  }
+
+  public void setEvalCommandRequestPayload(EvalCommandRequestPayload evalCommandRequestPayload)
+  {
+    this.evalCommandRequestPayload = evalCommandRequestPayload;
+  }
+
+  public ScriptExecutionRequestPayload getScriptExecutionRequestPayload()
+  {
+    return scriptExecutionRequestPayload;
+  }
+
+  public void setScriptExecutionRequestPayload(ScriptExecutionRequestPayload scriptExecutionRequestPayload)
+  {
+    this.scriptExecutionRequestPayload = scriptExecutionRequestPayload;
+  }
+
+  public Class<T> getExpectedReturnType()
+  {
+    return expectedReturnType;
+  }
+
+  public void setExpectedReturnType(Class<T> expectedReturnType)
+  {
+    this.expectedReturnType = expectedReturnType;
+  }
+
+  public long getTimeout()
+  {
+    return timeout;
+  }
+
+  public void setTimeout(long timeout)
+  {
+    this.timeout = timeout;
+  }
+
+  public TimeUnit getTimeUnit()
+  {
+    return timeUnit;
+  }
+
+  public void setTimeUnit(TimeUnit timeUnit)
+  {
+    this.timeUnit = timeUnit;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterResponse.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterResponse.java
new file mode 100644
index 0000000000..64dc1a57e7
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonInterpreterResponse.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import java.util.Map;
+
+public class PythonInterpreterResponse<T>
+{
+
+  Class<T> responseTypeClass;
+
+  Map<String,Boolean> commandStatus;
+
+  // To be used only by the Kryo serializer framework
+  public PythonInterpreterResponse()
+  {
+  }
+
+  public PythonInterpreterResponse(Class<T> responseTypeClassHandle)
+  {
+    responseTypeClass = responseTypeClassHandle;
+  }
+
+  T response;
+
+  public T getResponse()
+  {
+    return response;
+  }
+
+  public void setResponse(T response)
+  {
+    this.response = response;
+  }
+
+  public Map<String, Boolean> getCommandStatus()
+  {
+    return commandStatus;
+  }
+
+  public void setCommandStatus(Map<String, Boolean> commandStatus)
+  {
+    this.commandStatus = commandStatus;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonRequestResponse.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonRequestResponse.java
new file mode 100644
index 0000000000..9e3958ceba
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/PythonRequestResponse.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+
+public class PythonRequestResponse<T>
+{
+  PythonInterpreterRequest pythonInterpreterRequest;
+
+  PythonInterpreterResponse pythonInterpreterResponse;
+
+  long requestId;
+
+  long windowId;
+
+  long requestStartTime;
+
+  long requestCompletionTime;
+
+  public PythonInterpreterRequest getPythonInterpreterRequest()
+  {
+    return pythonInterpreterRequest;
+  }
+
+  public void setPythonInterpreterRequest(PythonInterpreterRequest pythonInterpreterRequest)
+  {
+    this.pythonInterpreterRequest = pythonInterpreterRequest;
+  }
+
+  public PythonInterpreterResponse getPythonInterpreterResponse()
+  {
+    return pythonInterpreterResponse;
+  }
+
+  public void setPythonInterpreterResponse(PythonInterpreterResponse pythonInterpreterResponse)
+  {
+    this.pythonInterpreterResponse = pythonInterpreterResponse;
+  }
+
+  public long getRequestId()
+  {
+    return requestId;
+  }
+
+  public void setRequestId(long requestId)
+  {
+    this.requestId = requestId;
+  }
+
+  public long getWindowId()
+  {
+    return windowId;
+  }
+
+  public void setWindowId(long windowId)
+  {
+    this.windowId = windowId;
+  }
+
+  public long getRequestStartTime()
+  {
+    return requestStartTime;
+  }
+
+  public void setRequestStartTime(long requestStartTime)
+  {
+    this.requestStartTime = requestStartTime;
+  }
+
+  public long getRequestCompletionTime()
+  {
+    return requestCompletionTime;
+  }
+
+  public void setRequestCompletionTime(long requestCompletionTime)
+  {
+    this.requestCompletionTime = requestCompletionTime;
+  }
+}
+
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/ScriptExecutionRequestPayload.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/ScriptExecutionRequestPayload.java
new file mode 100644
index 0000000000..403c73dea1
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/ScriptExecutionRequestPayload.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.requestresponse;
+
+public class ScriptExecutionRequestPayload
+{
+  String scriptName;
+
+  public String getScriptName()
+  {
+    return scriptName;
+  }
+
+  public void setScriptName(String scriptName)
+  {
+    this.scriptName = scriptName;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/package-info.java b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/package-info.java
new file mode 100644
index 0000000000..dce69a6f2d
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/requestresponse/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Evolving
+package org.apache.apex.malhar.python.base.requestresponse;
+
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/util/NDArrayKryoSerializer.java b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDArrayKryoSerializer.java
new file mode 100644
index 0000000000..839f493011
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDArrayKryoSerializer.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.util;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import jep.NDArray;
+
+/***
+ * A handy Kryo serializer class that can be used to serialize and deserialize a JEP NDArray instance. It is
+ *  recommended that {@link NDimensionalArray} be used in lieu of this class. This is because NDArray is highly specific
+ *   JEP data structure and will not give flexibility if the python engines are changed in the future.
+ */
+public class NDArrayKryoSerializer extends Serializer<NDArray>
+{
+  private static final short TRUE_AS_SHORTINT = 1;
+
+  private static final short FALSE_AS_SHORTINT = 0;
+
+  @Override
+  public void setGenerics(Kryo kryo, Class[] generics)
+  {
+    super.setGenerics(kryo, generics);
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output, NDArray ndArray)
+  {
+
+    Object dataVal = ndArray.getData();
+    if (dataVal == null) {
+      return;
+    }
+    // NDArray throws an exception in constructor if not an array. So below value will never be null
+    Class classNameForArrayType = dataVal.getClass().getComponentType(); // null if it is not an array
+    int[] dimensions = ndArray.getDimensions();
+    boolean signedFlag = ndArray.isUnsigned();
+    int arraySizeInSingleDimension = 1;
+    for (int aDimensionSize : dimensions) {
+      arraySizeInSingleDimension = arraySizeInSingleDimension * aDimensionSize;
+    }
+    // write the single dimension length
+    output.writeInt(arraySizeInSingleDimension);
+    // write the dimension of the dimensions int array
+    output.writeInt(dimensions.length);
+    // next we write the dimensions array itself
+    output.writeInts(dimensions);
+
+    // next write the unsigned flag
+    output.writeBoolean(signedFlag);
+
+    // write the data type of the N-dimensional Array
+    if (classNameForArrayType != null) {
+      output.writeString(classNameForArrayType.getCanonicalName());
+    } else {
+      output.writeString(null);
+    }
+
+    // write the array contents
+    if (dataVal != null) {
+      switch (classNameForArrayType.getCanonicalName()) {
+        case "float":
+          output.writeFloats((float[])dataVal);
+          break;
+        case "int":
+          output.writeInts((int[])dataVal);
+          break;
+        case "double":
+          output.writeDoubles((double[])dataVal);
+          break;
+        case "long":
+          output.writeLongs((long[])dataVal);
+          break;
+        case "short":
+          output.writeShorts((short[])dataVal);
+          break;
+        case "byte":
+          output.writeBytes((byte[])dataVal);
+          break;
+        case "boolean":
+          boolean[] originalBoolArray = (boolean[])dataVal;
+          short[] convertedBoolArray = new short[originalBoolArray.length];
+          for (int i = 0; i < originalBoolArray.length; i++) {
+            if (originalBoolArray[i]) {
+              convertedBoolArray[i] = TRUE_AS_SHORTINT;
+            } else {
+              convertedBoolArray[i] = FALSE_AS_SHORTINT;
+            }
+          }
+          output.writeShorts(convertedBoolArray);
+          break;
+        default:
+          throw new RuntimeException("Unsupported NDArray type serialization object");
+      }
+    }
+  }
+
+  @Override
+  public NDArray read(Kryo kryo, Input input, Class<NDArray> aClass)
+  {
+    int singleDimensionArrayLength = input.readInt();
+    int lengthOfDimensionsArray = input.readInt();
+    int[] dimensions = input.readInts(lengthOfDimensionsArray);
+    boolean signedFlag = input.readBoolean();
+
+    String dataType = input.readString();
+    if ( dataType == null) {
+      return null;
+    }
+    switch (dataType) {
+      case "float":
+        NDArray<float[]> floatNDArray = new NDArray<>(
+            input.readFloats(singleDimensionArrayLength),signedFlag,dimensions);
+        return floatNDArray;
+      case "int":
+        NDArray<int[]> intNDArray = new NDArray<>(
+            input.readInts(singleDimensionArrayLength),signedFlag,dimensions);
+        return intNDArray;
+      case "double":
+        NDArray<double[]> doubleNDArray = new NDArray<>(
+            input.readDoubles(singleDimensionArrayLength),signedFlag,dimensions);
+        return doubleNDArray;
+      case "long":
+        NDArray<long[]> longNDArray = new NDArray<>(
+            input.readLongs(singleDimensionArrayLength),signedFlag,dimensions);
+        return longNDArray;
+      case "short":
+        NDArray<short[]> shortNDArray = new NDArray<>(
+            input.readShorts(singleDimensionArrayLength),signedFlag,dimensions);
+        return shortNDArray;
+      case "byte":
+        NDArray<byte[]> byteNDArray = new NDArray<>(
+            input.readBytes(singleDimensionArrayLength),signedFlag,dimensions);
+        return byteNDArray;
+      case "boolean":
+        short[] shortsArray = input.readShorts(singleDimensionArrayLength);
+        boolean[] boolsArray = new boolean[shortsArray.length];
+        for (int i = 0; i < shortsArray.length; i++) {
+          if (TRUE_AS_SHORTINT == shortsArray[i]) {
+            boolsArray[i] = true;
+          } else {
+            boolsArray[i] = false;
+          }
+        }
+        NDArray<boolean[]> booleanNDArray = new NDArray<>(boolsArray,signedFlag,dimensions);
+        return booleanNDArray;
+      default:
+        return null;
+    }
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/util/NDimensionalArray.java b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDimensionalArray.java
new file mode 100644
index 0000000000..c233855651
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/util/NDimensionalArray.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.util;
+
+import jep.NDArray;
+
+/***
+ * Represents a wrapper around a numpy array. The way to build a numpy array is by first creating a single
+ *  dimensional array and then specifying the dimensions. The dimensions specify how the single dimension will slice
+ *   the single dimensional array.
+ * @param <T> The data type of the single dimensional array. For example for a numpy array of type float, T will be
+ *           of type float[].
+ *           <p>
+ *           Only the following types are supported:
+ *          <ol>
+ *           <li>float[]</li>
+ *           <li>int[]</li>
+ *           <li>double[]</li>
+ *           <li>long[]</li>
+ *           <li>short[]</li>
+ *           <li>byte[]</li>
+ *           <li>boolean[]</li>
+ *          </ol>
+ *           </p>
+ *           <p>No support for complex types. See example application in test modules for code snippets & usage</p>
+ */
+public class NDimensionalArray<T>
+{
+
+  int[] dimensions;
+
+  T data;
+
+  int lengthOfSequentialArray;
+
+  boolean signedFlag;
+
+  public NDimensionalArray()
+  {
+  }
+
+  public NDArray<T> toNDArray()
+  {
+    return new NDArray<T>(data,signedFlag,dimensions);
+  }
+
+  public int[] getDimensions()
+  {
+    return dimensions;
+  }
+
+  public void setDimensions(int[] dimensions)
+  {
+    this.dimensions = dimensions;
+  }
+
+  public int getLengthOfSequentialArray()
+  {
+    return lengthOfSequentialArray;
+  }
+
+  public void setLengthOfSequentialArray(int lengthOfSequentialArray)
+  {
+    this.lengthOfSequentialArray = lengthOfSequentialArray;
+  }
+
+  public boolean isSignedFlag()
+  {
+    return signedFlag;
+  }
+
+  public void setSignedFlag(boolean signedFlag)
+  {
+    this.signedFlag = signedFlag;
+  }
+
+  public T getData()
+  {
+    return data;
+  }
+
+  public void setData(T data)
+  {
+    this.data = data;
+  }
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/base/util/PythonRequestResponseUtil.java b/python/src/main/java/org/apache/apex/malhar/python/base/util/PythonRequestResponseUtil.java
new file mode 100644
index 0000000000..c998c20bbb
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/base/util/PythonRequestResponseUtil.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.apex.malhar.python.base.ApexPythonEngine;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.GenericCommandsRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.ScriptExecutionRequestPayload;
+
+/*** A handy utility class that implements boiler plate code while building request objects. Only commonly
+ * used ones are implemented.
+ *
+ */
+public class PythonRequestResponseUtil
+{
+  /***
+   * Builds the request object for run commands API request. See
+   * {@link ApexPythonEngine#runCommands(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for details
+   * @param commands
+   * @param timeOut timeout for the request to complete
+   * @param timeUnit Time units
+   * @return A request object that can be passed to the Python Engine API for run commands
+   */
+  public static PythonInterpreterRequest<Void> buildRequestObjectForRunCommands(List<String> commands, long timeOut,
+      TimeUnit timeUnit)
+  {
+    GenericCommandsRequestPayload genericCommandsRequestPayload = new GenericCommandsRequestPayload();
+    genericCommandsRequestPayload.setGenericCommands(commands);
+    PythonInterpreterRequest<Void> request = new PythonInterpreterRequest<>(Void.class);
+    request.setTimeUnit(timeUnit);
+    request.setTimeout(timeOut);
+    request.setGenericCommandsRequestPayload(genericCommandsRequestPayload);
+    return request;
+  }
+
+  /***
+   * Builds the request object for the Eval command request. See
+   * {@link ApexPythonEngine#eval(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for details
+   * @param evalCommand The eval expression
+   * @param evalParams Variables that need to be substituted
+   * @param varNameToExtract The name of variable to extract if any after the expression is evaluated. Can be null
+   * @param deleteVarAfterExtract The name of the variable to delete if any. null allowed
+   * @param timeOut Timeout for the API to complete processing
+   * @param timeUnit Units of time for the time out variable
+   * @param clazz The Class that represents the return type
+   * @param <T> Template construct for Java type inference
+   * @return The request object that can be used for the Eval command
+   */
+  public static <T> PythonInterpreterRequest<T> buildRequestForEvalCommand(
+      String evalCommand, Map<String,Object> evalParams, String varNameToExtract,
+      boolean deleteVarAfterExtract, long timeOut, TimeUnit timeUnit, Class<T> clazz)
+  {
+    PythonInterpreterRequest<T> request = new PythonInterpreterRequest<>(clazz);
+    EvalCommandRequestPayload evalCommandRequestPayload = new EvalCommandRequestPayload();
+    evalCommandRequestPayload.setEvalCommand(evalCommand);
+    evalCommandRequestPayload.setVariableNameToExtractInEvalCall(varNameToExtract);
+    evalCommandRequestPayload.setParamsForEvalCommand(evalParams);
+    evalCommandRequestPayload.setDeleteVariableAfterEvalCall(deleteVarAfterExtract);
+    request.setTimeUnit(timeUnit);
+    request.setTimeout(timeOut);
+    request.setEvalCommandRequestPayload(evalCommandRequestPayload);
+    return request;
+  }
+
+  /***
+   * Builds the request object for the Method call command. See
+   * {@link ApexPythonEngine#executeMethodCall(WorkerExecutionMode, long, long, PythonInterpreterRequest)} for details
+   * @param methodName Name of the method
+   * @param methodParams parames to the method
+   * @param timeOut Time allocated for completing the API
+   * @param timeUnit The units of time
+   * @param clazz The Class that represents the return type
+   * @param <T> Java templating signature
+   * @return The request object that can be used for method calls
+   */
+  public static <T> PythonInterpreterRequest<T> buildRequestForMethodCallCommand(
+      String methodName, List<Object> methodParams, long timeOut, TimeUnit timeUnit, Class<T> clazz)
+  {
+    PythonInterpreterRequest<T> request = new PythonInterpreterRequest<>(clazz);
+    MethodCallRequestPayload methodCallRequestPayload = new MethodCallRequestPayload();
+    methodCallRequestPayload.setNameOfMethod(methodName);
+    methodCallRequestPayload.setArgs(methodParams);
+    request.setTimeUnit(timeUnit);
+    request.setTimeout(timeOut);
+    request.setMethodCallRequest(methodCallRequestPayload);
+    return request;
+  }
+
+  /***
+   * Builds a request object that can be used for executing the script call commands.
+   * @param scriptPath Full path to the file name containing the script
+   * @param timeOut The time that can be used to complete the execution of the script
+   * @param timeUnit Unit of time for time out parameter
+   * @param clazz The class that can be used to represent the return type
+   * @param <T> Java template for type inference
+   * @return The Request object that can be used for a script call invocation
+   */
+  public static <T> PythonInterpreterRequest<T> buildRequestForScriptCallCommand(
+      String scriptPath, long timeOut, TimeUnit timeUnit, Class<T> clazz)
+  {
+    PythonInterpreterRequest<T> request = new PythonInterpreterRequest<>(clazz);
+    ScriptExecutionRequestPayload scriptExecutionRequestPayload = new ScriptExecutionRequestPayload();
+    scriptExecutionRequestPayload.setScriptName(scriptPath);
+    request.setTimeUnit(timeUnit);
+    request.setTimeout(timeOut);
+    request.setScriptExecutionRequestPayload(scriptExecutionRequestPayload);
+    return request;
+  }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplication.java b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplication.java
new file mode 100644
index 0000000000..9a38d81fe1
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplication.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.python;
+
+import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.function.FunctionOperator;
+import org.apache.apex.malhar.lib.function.FunctionOperatorUtil;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+@ApplicationAnnotation(name = "PythonOperatorExample")
+/**
+ * @since 3.8.0
+ */
+public class PythonExecutorApplication implements StreamingApplication
+{
+  @VisibleForTesting
+  Function.MapFunction<Object, ?> outputFn = FunctionOperatorUtil.CONSOLE_SINK_FN;
+
+  @VisibleForTesting
+  PythonPayloadPOJOGenerator pojoDataGenerator;
+
+  @VisibleForTesting
+  SimplePythonOpOperator simplePythonOpOperator;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    pojoDataGenerator = dag.addOperator("Input", new PythonPayloadPOJOGenerator());
+    simplePythonOpOperator = dag.addOperator("pythonprocessor", new SimplePythonOpOperator());
+    FunctionOperator.MapFunctionOperator<Object, ?> output = dag.addOperator("out",
+        new FunctionOperator.MapFunctionOperator<>(outputFn));
+    dag.addStream("InputToPython", pojoDataGenerator.output, simplePythonOpOperator.input);
+    dag.addStream("PythonToOutput", simplePythonOpOperator.outputPort, output.input);
+
+  }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplicationTest.java b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplicationTest.java
new file mode 100644
index 0000000000..fe3d69b54f
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonExecutorApplicationTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+import org.apache.apex.malhar.python.test.PythonAvailabilityTestRule;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+
+import static org.junit.Assert.assertEquals;
+
+public class PythonExecutorApplicationTest
+{
+  private static final List<Object> results = Collections.synchronizedList(new ArrayList<>());
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(PythonExecutorApplicationTest.class);
+
+  @Rule
+  public PythonAvailabilityTestRule jepAvailabilityBasedTest = new PythonAvailabilityTestRule();
+
+
+  @SuppressWarnings("serial")
+  private static final Function.MapFunction<Object, Void> outputFn = new Function.MapFunction<Object, Void>()
+  {
+    @Override
+    public Void f(Object input)
+    {
+      results.add(input);
+      return null;
+    }
+  };
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testApplication() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+    conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(Launcher.LaunchMode.EMBEDDED);
+    Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true);
+    PythonExecutorApplication pythonExecutorApplication = new PythonExecutorApplication();
+    pythonExecutorApplication.outputFn =  outputFn;
+    Launcher.AppHandle appHandle = launcher.launchApp(pythonExecutorApplication, conf, launchAttributes);
+    int sleepTimeCounterForLoopExit = 0;
+    int sleepTimePerIteration = 1000;
+    // wait until expected result count or timeout
+    while (results.size() < pythonExecutorApplication.pojoDataGenerator.getMaxTuples()) {
+      sleepTimeCounterForLoopExit += sleepTimePerIteration;
+      if (sleepTimeCounterForLoopExit > 30000) {
+        break;
+      }
+      LOG.info("Test sleeping until the application time out is reached");
+      Thread.sleep(sleepTimePerIteration);
+    }
+    appHandle.shutdown(Launcher.ShutdownMode.KILL);
+    assertEquals(pythonExecutorApplication.pojoDataGenerator.getMaxTuples(), results.size());
+  }
+
+}
+
+
+
+
+
+
+
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonPayloadPOJOGenerator.java b/python/src/test/java/org/apache/apex/malhar/python/PythonPayloadPOJOGenerator.java
new file mode 100644
index 0000000000..6996d94c9f
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonPayloadPOJOGenerator.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.python;
+
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+/**
+ * Generates and emits a simple float and int for python operator to consume
+ *
+ * @since 3.8.0
+ */
+public class PythonPayloadPOJOGenerator implements InputOperator
+{
+  private long tuplesCounter = 0;
+  private long currentWindowTuplesCounter = 0;
+
+  // Limit number of emitted tuples per window
+  @Min(1)
+  private long maxTuplesPerWindow = 150;
+
+  @Min(1)
+  private long maxTuples = 300;
+
+  private final Random random = new Random();
+
+  private static final int MAX_RANDOM_INT = 100;
+
+  public static final int DIMENSION_SIZE = 2;
+
+  public static int[] intDimensionSums = new int[ DIMENSION_SIZE * DIMENSION_SIZE ];
+
+  public static float[] floatDimensionSums = new float[DIMENSION_SIZE * DIMENSION_SIZE];
+
+  public final transient DefaultOutputPort<PythonProcessingPojo> output = new DefaultOutputPort<>();
+
+  public PythonPayloadPOJOGenerator()
+  {
+    for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
+      intDimensionSums[i] = 0;
+      floatDimensionSums[i] = 0;
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowTuplesCounter = 0;
+  }
+
+  @Override
+  public void endWindow()
+  {
+
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    while ( ( currentWindowTuplesCounter < maxTuplesPerWindow) && (tuplesCounter < maxTuples) ) {
+      PythonProcessingPojo pythonProcessingPojo = new PythonProcessingPojo();
+      pythonProcessingPojo.setX(random.nextInt(MAX_RANDOM_INT));
+      pythonProcessingPojo.setY(random.nextFloat());
+
+      float[] f = new float[( DIMENSION_SIZE * DIMENSION_SIZE)];
+      for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
+        f[i] = random.nextFloat();
+        floatDimensionSums[ i % DIMENSION_SIZE ] = floatDimensionSums[ i % DIMENSION_SIZE ] + f[i];
+      }
+      NDimensionalArray<float[]> nDimensionalFloatArray = new NDimensionalArray<>();
+      nDimensionalFloatArray.setData(f);
+      nDimensionalFloatArray.setDimensions(new int[] {DIMENSION_SIZE, DIMENSION_SIZE});
+      nDimensionalFloatArray.setLengthOfSequentialArray(floatDimensionSums.length);
+      nDimensionalFloatArray.setSignedFlag(false);
+      pythonProcessingPojo.setNumpyFloatArray(nDimensionalFloatArray);
+
+      int[] ints = new int[( DIMENSION_SIZE * DIMENSION_SIZE)];
+      for ( int i = 0; i < (DIMENSION_SIZE * DIMENSION_SIZE ); i++) {
+        ints[i] = random.nextInt(MAX_RANDOM_INT);
+        intDimensionSums[ i % DIMENSION_SIZE ] = intDimensionSums [ i % DIMENSION_SIZE ] + ints[i];
+      }
+      NDimensionalArray<int[]> nDimensionalIntArray = new NDimensionalArray<>();
+      nDimensionalIntArray.setData(ints);
+      nDimensionalIntArray.setDimensions(new int[] {DIMENSION_SIZE, DIMENSION_SIZE});
+      nDimensionalIntArray.setLengthOfSequentialArray(ints.length);
+      nDimensionalIntArray.setSignedFlag(false);
+      pythonProcessingPojo.setNumpyIntArray(nDimensionalIntArray);
+      output.emit(pythonProcessingPojo);
+      currentWindowTuplesCounter += 1;
+      tuplesCounter += 1;
+    }
+  }
+
+  public long getMaxTuples()
+  {
+    return maxTuples;
+  }
+
+  public void setMaxTuples(long maxTuples)
+  {
+    this.maxTuples = maxTuples;
+  }
+
+  public long getMaxTuplesPerWindow()
+  {
+    return maxTuplesPerWindow;
+  }
+
+  public void setMaxTuplesPerWindow(long maxTuplesPerWindow)
+  {
+    this.maxTuplesPerWindow = maxTuplesPerWindow;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  public static int[] getIntDimensionSums()
+  {
+    return intDimensionSums;
+  }
+
+  public static void setIntDimensionSums(int[] intDimensionSums)
+  {
+    PythonPayloadPOJOGenerator.intDimensionSums = intDimensionSums;
+  }
+
+  public static float[] getFloatDimensionSums()
+  {
+    return floatDimensionSums;
+  }
+
+  public static void setFloatDimensionSums(float[] floatDimensionSums)
+  {
+    PythonPayloadPOJOGenerator.floatDimensionSums = floatDimensionSums;
+  }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/PythonProcessingPojo.java b/python/src/test/java/org/apache/apex/malhar/python/PythonProcessingPojo.java
new file mode 100644
index 0000000000..33c3452919
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/PythonProcessingPojo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+
+public class PythonProcessingPojo
+{
+
+  private float y;
+
+  private int x;
+
+
+  private NDimensionalArray<float[]> numpyFloatArray;
+
+  private NDimensionalArray<int[]> numpyIntArray;
+
+
+  public float getY()
+  {
+    return y;
+  }
+
+  public void setY(float y)
+  {
+    this.y = y;
+  }
+
+  public int getX()
+  {
+    return x;
+  }
+
+  public void setX(int x)
+  {
+    this.x = x;
+  }
+
+  public NDimensionalArray<float[]> getNumpyFloatArray()
+  {
+    return numpyFloatArray;
+  }
+
+  public void setNumpyFloatArray(NDimensionalArray<float[]> numpyFloatArray)
+  {
+    this.numpyFloatArray = numpyFloatArray;
+  }
+
+  public NDimensionalArray<int[]> getNumpyIntArray()
+  {
+    return numpyIntArray;
+  }
+
+  public void setNumpyIntArray(NDimensionalArray<int[]> numpyIntArray)
+  {
+    this.numpyIntArray = numpyIntArray;
+  }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/SimplePythonOpOperator.java b/python/src/test/java/org/apache/apex/malhar/python/SimplePythonOpOperator.java
new file mode 100644
index 0000000000..69ece35e88
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/SimplePythonOpOperator.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.apex.malhar.python.base.ApexPythonEngine;
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.BasePythonExecutionOperator;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.jep.SpinPolicy;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.util.NDimensionalArray;
+import org.apache.apex.malhar.python.base.util.PythonRequestResponseUtil;
+
+public class SimplePythonOpOperator extends BasePythonExecutionOperator<PythonProcessingPojo>
+{
+  private Map<String,PythonRequestResponse<NDimensionalArray>> lastKnownResponse;
+
+  @Override
+  public Map<PythonInterpreterConfig, Object> getPreInitConfigurations()
+  {
+    Map<PythonInterpreterConfig,Object> preInitConfigs = new HashMap<>();
+    Set<String> sharedLibsList = new HashSet<>();
+    sharedLibsList.add("numpy");
+    preInitConfigs.put(PythonInterpreterConfig.PYTHON_SHARED_LIBS, sharedLibsList);
+    // Next two configs allow for a very low latency mode wherein the cost of CPU is sacrificed for low latencies
+    // Defaults are saner and the following config is overriding
+    preInitConfigs.put(PythonInterpreterConfig.IDLE_INTERPRETER_SPIN_POLICY, "" + SpinPolicy.BUSY_SPIN.name());
+    preInitConfigs.put(PythonInterpreterConfig.REQUEST_QUEUE_WAIT_SPIN_POLICY,
+        com.conversantmedia.util.concurrent.SpinPolicy.SPINNING);
+    return preInitConfigs;
+  }
+
+  @Override
+  public PythonRequestResponse processPythonCodeForIncomingTuple(PythonProcessingPojo input, ApexPythonEngine pythonEngineRef)
+    throws ApexPythonInterpreterException
+  {
+    Map<String,Object> evalParams = new HashMap<>();
+    evalParams.put("intArrayToAdd",input.getNumpyIntArray());
+    evalParams.put("floatArrayToAdd",input.getNumpyFloatArray());
+    // Not assigning to any var as this results in output printed on the console which can be a validation of redirect
+    String evalCommand = "npval=np.add(intMatrix,intArrayToAdd)";
+    //String evalCommand = "print(type(intArrayToAdd))";
+    PythonInterpreterRequest<NDimensionalArray> request = PythonRequestResponseUtil.buildRequestForEvalCommand(
+        evalCommand,evalParams,"intMatrix",false, 20,
+        TimeUnit.MILLISECONDS, NDimensionalArray.class);
+    lastKnownResponse = pythonEngineRef.eval(
+        WorkerExecutionMode.ANY,currentWindowId, requestCounterForThisWindow,request);
+    for ( String evalCommandSubmitted: lastKnownResponse.keySet()) {
+      return lastKnownResponse.get(evalCommandSubmitted); // we just need one of the N workers response.
+    }
+    return null;
+  }
+
+  @Override
+  public void processPostSetUpPythonInstructions(ApexPythonEngine pythonEngineRef) throws ApexPythonInterpreterException
+  {
+    List<String> commandsToRun = new ArrayList<>();
+    commandsToRun.add("import sys");
+    commandsToRun.add("import numpy as np");
+    commandsToRun.add("intMatrix = np.ones((2,2),dtype=int)");
+    commandsToRun.add("floatMatrix = np.ones((2,2),dtype=float)");
+    pythonEngineRef.runCommands(WorkerExecutionMode.BROADCAST,0L,0L,
+        PythonRequestResponseUtil.buildRequestObjectForRunCommands(commandsToRun,1, TimeUnit.SECONDS));
+  }
+
+  public Map<String, PythonRequestResponse<NDimensionalArray>> getLastKnownResponse()
+  {
+    return lastKnownResponse;
+  }
+
+  public void setLastKnownResponse(Map<String, PythonRequestResponse<NDimensionalArray>> lastKnownResponse)
+  {
+    this.lastKnownResponse = lastKnownResponse;
+  }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/BaseJEPTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/BaseJEPTest.java
new file mode 100644
index 0000000000..7a1c1c10fe
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/BaseJEPTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.ApexPythonInterpreterException;
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.GenericCommandsRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterResponse;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.base.requestresponse.ScriptExecutionRequestPayload;
+import org.apache.apex.malhar.python.test.BasePythonTest;
+
+import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import com.conversantmedia.util.concurrent.SpinPolicy;
+
+public class BaseJEPTest extends BasePythonTest
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(BaseJEPTest.class);
+
+  public static boolean JEP_INITIALIZED = false;
+
+  private static  Object lockToInitializeJEP = new Object();
+
+  static InterpreterThread pythonEngineThread;
+
+  static InterpreterWrapper interpreterWrapper;
+
+  static JepPythonEngine jepPythonEngine;
+
+  static ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+  static BlockingQueue<PythonRequestResponse> requestQueue =
+      new DisruptorBlockingQueue<PythonRequestResponse>(8, SpinPolicy.WAITING);
+
+  static BlockingQueue<PythonRequestResponse> responseQueue =
+      new DisruptorBlockingQueue<PythonRequestResponse>(8,SpinPolicy.WAITING);
+
+  static BlockingQueue<PythonRequestResponse> delayedResponseQueueForWrapper =
+      new DisruptorBlockingQueue<PythonRequestResponse>(8, SpinPolicy.WAITING);
+
+
+  public static void initJEPThread() throws Exception
+  {
+    if (!JEP_INITIALIZED) {
+      synchronized (lockToInitializeJEP) {
+        if (!JEP_INITIALIZED) {
+          // Interpreter for thread based tests
+          pythonEngineThread = new InterpreterThread(requestQueue,responseQueue,"unittests-1");
+          pythonEngineThread.preInitInterpreter(new HashMap<PythonInterpreterConfig,Object>());
+          executorService.submit(pythonEngineThread);
+
+          // interpreter wrapper for wrapper based tests
+          interpreterWrapper = new InterpreterWrapper("unit-test-wrapper",delayedResponseQueueForWrapper,
+              SpinPolicy.SPINNING);
+          interpreterWrapper.startInterpreter();
+
+          // JEP python engine tests
+          jepPythonEngine = new JepPythonEngine("unit-tests-jeppythonengine",5);
+          jepPythonEngine.preInitInterpreter(new HashMap<PythonInterpreterConfig,Object>());
+          jepPythonEngine.startInterpreter();
+          JEP_INITIALIZED = true;
+        }
+      }
+    }
+  }
+
+  private void setCommonConstructsForRequestResponseObject(PythonCommandType commandType,
+      PythonInterpreterRequest request, PythonRequestResponse requestResponse )
+    throws ApexPythonInterpreterException
+  {
+    request.setCommandType(commandType);
+    requestResponse.setRequestStartTime(System.currentTimeMillis());
+    requestResponse.setRequestId(1L);
+    requestResponse.setWindowId(1L);
+    switch (commandType) {
+      case EVAL_COMMAND:
+        EvalCommandRequestPayload payload = new EvalCommandRequestPayload();
+        request.setEvalCommandRequestPayload(payload);
+        break;
+      case METHOD_INVOCATION_COMMAND:
+        MethodCallRequestPayload methodCallRequest = new MethodCallRequestPayload();
+        request.setMethodCallRequest(methodCallRequest);
+        break;
+      case SCRIPT_COMMAND:
+        ScriptExecutionRequestPayload scriptPayload = new ScriptExecutionRequestPayload();
+        request.setScriptExecutionRequestPayload(scriptPayload);
+        break;
+      case GENERIC_COMMANDS:
+        GenericCommandsRequestPayload payloadForGenericCommands = new GenericCommandsRequestPayload();
+        request.setGenericCommandsRequestPayload(payloadForGenericCommands);
+        break;
+      default:
+        throw new ApexPythonInterpreterException("Unsupported command type");
+    }
+
+  }
+
+  public PythonRequestResponse<Void> buildRequestResponseObjectForVoidPayload(PythonCommandType commandType)
+    throws Exception
+  {
+    PythonRequestResponse<Void> requestResponse = new PythonRequestResponse();
+    PythonInterpreterRequest<Void> request = new PythonInterpreterRequest<>(Void.class);
+    PythonInterpreterResponse<Void> response = new PythonInterpreterResponse<>(Void.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    requestResponse.setPythonInterpreterResponse(response);
+    setCommonConstructsForRequestResponseObject(commandType,request,requestResponse);
+    return requestResponse;
+  }
+
+  public PythonRequestResponse<Long> buildRequestResponseObjectForLongPayload(
+      PythonCommandType commandType) throws Exception
+  {
+    PythonRequestResponse<Long> requestResponse = new PythonRequestResponse();
+    PythonInterpreterRequest<Long> request = new PythonInterpreterRequest<>(Long.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    PythonInterpreterResponse<Long> response = new PythonInterpreterResponse<>(Long.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    requestResponse.setPythonInterpreterResponse(response);
+    setCommonConstructsForRequestResponseObject(commandType,request,requestResponse);
+    return requestResponse;
+  }
+
+
+
+  public PythonRequestResponse<Integer> buildRequestResponseObjectForIntPayload(
+      PythonCommandType commandType) throws Exception
+  {
+    PythonRequestResponse<Integer> requestResponse = new PythonRequestResponse();
+    PythonInterpreterRequest<Integer> request = new PythonInterpreterRequest<>(Integer.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    PythonInterpreterResponse<Integer> response = new PythonInterpreterResponse<>(Integer.class);
+    requestResponse.setPythonInterpreterRequest(request);
+    requestResponse.setPythonInterpreterResponse(response);
+    setCommonConstructsForRequestResponseObject(commandType,request,requestResponse);
+    return requestResponse;
+  }
+
+
+  protected PythonRequestResponse<Void> runCommands(List<String> commands) throws Exception
+  {
+    PythonRequestResponse<Void> runCommandsRequest = buildRequestResponseObjectForVoidPayload(
+        PythonCommandType.GENERIC_COMMANDS);
+    runCommandsRequest.getPythonInterpreterRequest().getGenericCommandsRequestPayload().setGenericCommands(commands);
+    pythonEngineThread.getRequestQueue().put(runCommandsRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    return pythonEngineThread.getResponseQueue().poll(1, TimeUnit.SECONDS);
+  }
+
+
+  protected PythonInterpreterRequest<Long> buildRequestObjectForLongEvalCommand(String command, String returnVar,
+      Map<String,Object> paramsForEval, long timeOut, TimeUnit timeUnit, boolean deleteVariable)
+  {
+    PythonInterpreterRequest<Long> request = new PythonInterpreterRequest<>(Long.class);
+    request.setTimeout(timeOut);
+    request.setTimeUnit(timeUnit);
+    EvalCommandRequestPayload evalCommandRequestPayload = new EvalCommandRequestPayload();
+    request.setEvalCommandRequestPayload(evalCommandRequestPayload);
+    evalCommandRequestPayload.setParamsForEvalCommand(paramsForEval);
+    evalCommandRequestPayload.setDeleteVariableAfterEvalCall(deleteVariable);
+    evalCommandRequestPayload.setVariableNameToExtractInEvalCall(returnVar);
+    evalCommandRequestPayload.setEvalCommand(command);
+    request.setExpectedReturnType(Long.class);
+    return request;
+  }
+
+  protected PythonInterpreterRequest<Void> buildRequestObjectForVoidGenericCommand(List<String> commands, long timeOut,
+      TimeUnit timeUnit)
+  {
+    PythonInterpreterRequest<Void> genericCommandRequest = new PythonInterpreterRequest<>(Void.class);
+    genericCommandRequest.setTimeout(timeOut);
+    genericCommandRequest.setTimeUnit(timeUnit);
+    GenericCommandsRequestPayload genericCommandsRequestPayload = new GenericCommandsRequestPayload();
+    genericCommandsRequestPayload.setGenericCommands(commands);
+    genericCommandRequest.setExpectedReturnType(Void.class);
+    genericCommandRequest.setGenericCommandsRequestPayload(genericCommandsRequestPayload);
+    return genericCommandRequest;
+  }
+
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterThreadTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterThreadTest.java
new file mode 100644
index 0000000000..9e3760526d
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterThreadTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.requestresponse.EvalCommandRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.MethodCallRequestPayload;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class InterpreterThreadTest extends BaseJEPTest
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(InterpreterThreadTest.class);
+
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testRunCommands() throws Exception
+  {
+    long currentTime = System.currentTimeMillis();
+    File tempFile = File.createTempFile("apexpythonunittestruncommands-", ".txt");
+    tempFile.deleteOnExit();
+    String filePath = tempFile.getAbsolutePath();
+    assertEquals(0L,tempFile.length());
+
+    List<String> commands = new ArrayList();
+    commands.add("fileHandle  = open('" + filePath + "', 'w')");
+    commands.add("fileHandle.write('" + currentTime + "')");
+    commands.add("fileHandle.flush()");
+    commands.add("fileHandle.close()");
+    runCommands(commands);
+    assertEquals(("" + currentTime).length(), tempFile.length());
+
+    List<String> errorCommands = new ArrayList();
+    errorCommands.add("1+2");
+    errorCommands.add("3+");
+    PythonRequestResponse<Void> response = runCommands(errorCommands);
+    Map<String,Boolean> responseStatus = response.getPythonInterpreterResponse().getCommandStatus();
+    assertTrue(responseStatus.get(errorCommands.get(0)));
+    assertFalse(responseStatus.get(errorCommands.get(1)));
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testMethodCall() throws Exception
+  {
+    String methodName = "jepMultiply";
+    List<String> commands = new ArrayList();
+    commands.add("def " + methodName + "(firstnum, secondnum):\n" +
+        "\treturn (firstnum * secondnum)\n"); // Note that this cannot be split as multiple commands
+    runCommands(commands);
+
+    List<Object> params = new ArrayList<>();
+    params.add(5L);
+    params.add(25L);
+
+    PythonRequestResponse<Long> methodCallRequest = buildRequestResponseObjectForLongPayload(
+        PythonCommandType.METHOD_INVOCATION_COMMAND);
+    MethodCallRequestPayload requestPayload = methodCallRequest.getPythonInterpreterRequest().getMethodCallRequest();
+    requestPayload.setNameOfMethod(methodName);
+    requestPayload.setArgs(params);
+    methodCallRequest.getPythonInterpreterRequest().setExpectedReturnType(Long.class);
+
+    pythonEngineThread.getRequestQueue().put(methodCallRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    PythonRequestResponse<Long> methodCallResponse = pythonEngineThread.getResponseQueue().poll(1,
+        TimeUnit.SECONDS);
+    assertEquals(methodCallResponse.getPythonInterpreterResponse().getResponse(),125L);
+    Map<String,Boolean> commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+    assertTrue(commandStatus.get(methodName));
+
+    params.remove(1);
+    methodCallRequest = buildRequestResponseObjectForLongPayload(PythonCommandType.METHOD_INVOCATION_COMMAND);
+    requestPayload = methodCallRequest
+        .getPythonInterpreterRequest().getMethodCallRequest();
+    requestPayload.setNameOfMethod(methodName);
+    requestPayload.setArgs(params);
+    methodCallRequest.getPythonInterpreterRequest().setExpectedReturnType(Long.class);
+
+    pythonEngineThread.getRequestQueue().put(methodCallRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    methodCallResponse = pythonEngineThread.getResponseQueue().poll(1, TimeUnit.SECONDS);
+    commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+    assertFalse(commandStatus.get(methodName));
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testScriptCall() throws Exception
+  {
+    File tempFileForScript = File.createTempFile("apexpythonunittestscript-", ".py");
+    tempFileForScript.deleteOnExit();
+    String filePathForFactorialScript = tempFileForScript.getAbsolutePath();
+    migrateFileFromResourcesFolderToTemp("factorial.py",filePathForFactorialScript);
+    PythonRequestResponse<Void> methodCallRequest = buildRequestResponseObjectForVoidPayload(
+        PythonCommandType.SCRIPT_COMMAND);
+    methodCallRequest.getPythonInterpreterRequest().getScriptExecutionRequestPayload().setScriptName(
+        filePathForFactorialScript);
+    pythonEngineThread.getRequestQueue().put(methodCallRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    PythonRequestResponse<Void> methodCallResponse = pythonEngineThread.getResponseQueue().poll(1,
+        TimeUnit.SECONDS);
+    Map<String,Boolean> commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+    assertTrue(commandStatus.get(filePathForFactorialScript));
+    try (BufferedReader br = new BufferedReader(new FileReader("target/factorial-result.txt"))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        assertEquals(120,Integer.parseInt(line)); // asset factorial is calculated as written in script in resources
+        break; // There is only one line in the file per the python script
+      }
+    }
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testEvalCall() throws Exception
+  {
+    String expression = new String("x = a + b");
+    Random random = new Random();
+    int a = random.nextInt(100);
+    int b = random.nextInt(100);
+    Map<String,Object> argsForEval = new HashMap<>();
+    argsForEval.put("a",a);
+    argsForEval.put("b",b);
+    PythonRequestResponse<Long> methodCallRequest = buildRequestResponseObjectForLongPayload(
+        PythonCommandType.EVAL_COMMAND);
+    EvalCommandRequestPayload payload = methodCallRequest.getPythonInterpreterRequest().getEvalCommandRequestPayload();
+    payload.setEvalCommand(expression);
+    payload.setParamsForEvalCommand(argsForEval);
+    payload.setDeleteVariableAfterEvalCall(true);
+    payload.setVariableNameToExtractInEvalCall("x");
+    methodCallRequest.getPythonInterpreterRequest().setExpectedReturnType(Long.class);
+    pythonEngineThread.getRequestQueue().put(methodCallRequest);
+    Thread.sleep(1000); // wait for command to be processed
+    PythonRequestResponse<Integer> methodCallResponse = pythonEngineThread.getResponseQueue().poll(1,
+        TimeUnit.SECONDS);
+    Map<String,Boolean> commandStatus = methodCallResponse.getPythonInterpreterResponse().getCommandStatus();
+    assertTrue(commandStatus.get(expression));
+    assertEquals(methodCallResponse.getPythonInterpreterResponse().getResponse(),(long)(a + b));
+  }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapperTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapperTest.java
new file mode 100644
index 0000000000..6c2877d232
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/InterpreterWrapperTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class InterpreterWrapperTest extends BaseJEPTest
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(InterpreterWrapperTest.class);
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testTimeOuts() throws Exception
+  {
+    List<String> sequenceOfCommands = new ArrayList();
+    sequenceOfCommands.add("import time");
+    sequenceOfCommands.add("time.sleep(1)");
+
+    PythonInterpreterRequest<Void> requestOne = buildRequestObjectForVoidGenericCommand(
+        sequenceOfCommands,3,TimeUnit.SECONDS);
+    PythonRequestResponse<Void> resultOne = interpreterWrapper.runCommands(1L,1L, requestOne);
+    assertNotNull(resultOne);
+
+    requestOne.setTimeUnit(TimeUnit.MILLISECONDS);
+    requestOne.setTimeout(5);
+    PythonRequestResponse<Void> resultTWo = interpreterWrapper.runCommands(1L,1L,requestOne);
+    assertNull(resultTWo);
+
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testDelayedResponseQueue() throws Exception
+  {
+    List<String> sequenceOfCommands = new ArrayList();
+    sequenceOfCommands.add("import time");
+    sequenceOfCommands.add("x=4;time.sleep(1)");
+
+    PythonInterpreterRequest<Void> requestOne = buildRequestObjectForVoidGenericCommand(
+        sequenceOfCommands,300,TimeUnit.MILLISECONDS);
+    PythonRequestResponse<Void> resultOne = interpreterWrapper.runCommands(1L,1L,requestOne);
+
+    HashMap<String,Object> evalParams  = new HashMap<>();
+    evalParams.put("y", 2);
+
+    PythonInterpreterRequest<Long> requestTwo = buildRequestObjectForLongEvalCommand(
+        "x = x * y;time.sleep(1)","x",evalParams,10,TimeUnit.MILLISECONDS,false);
+    PythonRequestResponse<Long> result = interpreterWrapper.eval(1L,1L,requestTwo);
+
+    Thread.sleep(3000);
+
+    // only the next command will result in the queue getting drained of previous requests hence below
+    sequenceOfCommands = new ArrayList();
+    sequenceOfCommands.add("x=5");
+
+    PythonInterpreterRequest<Void> requestThree = buildRequestObjectForVoidGenericCommand(
+        sequenceOfCommands,300,TimeUnit.MILLISECONDS);
+    PythonRequestResponse<Void> resultThree = interpreterWrapper.runCommands(1L,1L,requestThree);
+
+    assertFalse(delayedResponseQueueForWrapper.isEmpty());
+    assertEquals(2, delayedResponseQueueForWrapper.drainTo(new ArrayList<>()));
+
+  }
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/base/jep/JepPythonEngineTest.java b/python/src/test/java/org/apache/apex/malhar/python/base/jep/JepPythonEngineTest.java
new file mode 100644
index 0000000000..98a1ee1bf0
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/base/jep/JepPythonEngineTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.base.jep;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.PythonInterpreterConfig;
+import org.apache.apex.malhar.python.base.WorkerExecutionMode;
+import org.apache.apex.malhar.python.base.requestresponse.PythonCommandType;
+import org.apache.apex.malhar.python.base.requestresponse.PythonInterpreterRequest;
+import org.apache.apex.malhar.python.base.requestresponse.PythonRequestResponse;
+import org.apache.apex.malhar.python.test.JepPythonTestContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class JepPythonEngineTest extends BaseJEPTest
+{
+  private static final transient Logger LOG = LoggerFactory.getLogger(JepPythonEngineTest.class);
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testSelectWorkerForCurrentCall() throws Exception
+  {
+    Set<String> overloadedWorkers = new HashSet<>();
+    List<String> busyCommands = new ArrayList<>();
+    busyCommands.add("import time");
+    busyCommands.add("time.sleep(5)");
+    PythonInterpreterRequest<Void> busyCommandRequest = buildRequestObjectForVoidGenericCommand(busyCommands,
+        2,TimeUnit.MILLISECONDS);
+
+    for ( int i = 0; i < jepPythonEngine.getNumWorkerThreads() - 1; i++) {
+      InterpreterWrapper aWrapper = jepPythonEngine.getWorkers().get(i);
+      aWrapper.runCommands(1L,i,busyCommandRequest);
+      assertTrue(aWrapper.isCurrentlyBusy());
+      overloadedWorkers.add(aWrapper.getInterpreterId());
+    }
+    InterpreterWrapper candidateWrapperForExecution = null;
+    InterpreterWrapper validCandidateWrapperForExecution = null;
+    int counterForNullWorkers = 0;
+    int counterForValidWorkers = 0;
+    for ( int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+      candidateWrapperForExecution = jepPythonEngine.selectWorkerForCurrentCall(i);
+      if ( candidateWrapperForExecution == null) {
+        counterForNullWorkers += 1;
+      } else {
+        counterForValidWorkers += 1;
+        validCandidateWrapperForExecution = candidateWrapperForExecution;
+      }
+    }
+    // numWorker threads  because the select worker calls iterates over all workers to
+    // get any of the free workers. We did not give any worker for the 5th worker and hence should pass for all calls
+    assertEquals(jepPythonEngine.getNumWorkerThreads(), counterForValidWorkers);
+    assertEquals( 0, counterForNullWorkers); // None of the attempts should fail to get a worker
+    // Also we can only get that worker which has not been assigned a sleep instruction
+    assertFalse(overloadedWorkers.contains(validCandidateWrapperForExecution.getInterpreterId()));
+    Thread.sleep(5000); // all the python workers must be free after this line
+    // we now test for all workers being busy
+    for ( int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+      InterpreterWrapper aWrapper = jepPythonEngine.getWorkers().get(i);
+      aWrapper.runCommands(1L,i,busyCommandRequest);
+    }
+    for (int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+      candidateWrapperForExecution = jepPythonEngine.selectWorkerForCurrentCall(i);
+      assertNull(candidateWrapperForExecution);
+    }
+    Thread.sleep(5000); // ensures other tests in this class can use this engine after this test
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testWorkerExecutionMode() throws Exception
+  {
+    String methodName = "multiply";
+    List<String> commands = new ArrayList();
+    commands.add("def " + methodName + "(firstnum, secondnum):\n" +
+        "\treturn (firstnum * secondnum)\n"); // Note that this cannot be split as multiple commands
+
+    PythonInterpreterRequest<Void> requestForDef = buildRequestObjectForVoidGenericCommand(commands,1000,
+        TimeUnit.MILLISECONDS);
+
+    for (int i = 0; i < jepPythonEngine.getNumWorkerThreads(); i++) {
+      InterpreterWrapper aWrapper = jepPythonEngine.getWorkers().get(i);
+      aWrapper.runCommands(1L,1L,requestForDef);
+    }
+
+    HashMap<String,Object> params = new HashMap<>();
+    params.put("y",3);
+    Map<String,PythonRequestResponse<Long>> response = jepPythonEngine.eval(WorkerExecutionMode.BROADCAST,
+        1L,1L,buildRequestObjectForLongEvalCommand("x=multiply(4,y)",
+        "x", params,1000,TimeUnit.MILLISECONDS,false));
+    for (String aWorkerId: response.keySet()) {
+      assertEquals(12L,response.get(aWorkerId).getPythonInterpreterResponse().getResponse());
+    }
+    assertEquals(jepPythonEngine.getNumWorkerThreads(),response.size()); // ensure all workers responded
+
+    params = new HashMap<>();
+    params.put("y",6);
+    response = jepPythonEngine.eval(WorkerExecutionMode.ANY,
+      1L,1L,buildRequestObjectForLongEvalCommand("x=multiply(4,y)",
+      "x", params,1000,TimeUnit.MILLISECONDS,false));
+    for (String aWorkerId: response.keySet()) {
+      assertEquals(24L,response.get(aWorkerId).getPythonInterpreterResponse().getResponse());
+    }
+    assertEquals(1,response.size()); // ensure all workers responded
+    Thread.sleep(5000); // ensure subsequent tests are not impacted by busy flags from current test
+  }
+
+  @JepPythonTestContext(jepPythonBasedTest = true)
+  @Test
+  public void testPostStartInterpreterLogic() throws Exception
+  {
+    JepPythonEngine pythonEngineForPostInit = new JepPythonEngine("unit-tests-jeppythonengine-preint",
+        5);
+    List<String> commandsForPreInit = new ArrayList<>();
+    commandsForPreInit.add("x=5");
+    PythonRequestResponse<Void> aHistoryCommand = buildRequestResponseObjectForVoidPayload(
+        PythonCommandType.GENERIC_COMMANDS);
+    aHistoryCommand.getPythonInterpreterRequest()
+        .getGenericCommandsRequestPayload().setGenericCommands(commandsForPreInit);
+    aHistoryCommand.getPythonInterpreterRequest().setTimeout(10);
+    aHistoryCommand.getPythonInterpreterRequest().setTimeUnit(TimeUnit.MILLISECONDS);
+    List<PythonRequestResponse> historyOfCommands = new ArrayList<>();
+    historyOfCommands.add(aHistoryCommand);
+    pythonEngineForPostInit.setCommandHistory(historyOfCommands);
+
+    pythonEngineForPostInit.preInitInterpreter(new HashMap<PythonInterpreterConfig,Object>());
+    pythonEngineForPostInit.startInterpreter();
+
+    pythonEngineForPostInit.postStartInterpreter();
+
+    HashMap<String,Object> params = new HashMap<>();
+    params.put("y",4);
+    Map<String,PythonRequestResponse<Long>> resultOfExecution = pythonEngineForPostInit.eval(
+        WorkerExecutionMode.BROADCAST, 1L,1L, buildRequestObjectForLongEvalCommand(
+        "x=x+y","x",params,100,TimeUnit.MILLISECONDS,false));
+    assertEquals(pythonEngineForPostInit.getNumWorkerThreads(),resultOfExecution.size());
+    for (String aWorkerId : resultOfExecution.keySet()) {
+      assertEquals(9L, resultOfExecution.get(aWorkerId).getPythonInterpreterResponse().getResponse());
+    }
+  }
+}
+
+
diff --git a/python/src/test/java/org/apache/apex/malhar/python/test/BasePythonTest.java b/python/src/test/java/org/apache/apex/malhar/python/test/BasePythonTest.java
new file mode 100644
index 0000000000..0807da1fd2
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/test/BasePythonTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.test;
+
+import java.io.File;
+
+import org.junit.Rule;
+
+import org.apache.commons.io.FileUtils;
+
+public class BasePythonTest
+{
+  @Rule
+  public PythonAvailabilityTestRule jepAvailabilityBasedTest = new PythonAvailabilityTestRule();
+
+  protected void migrateFileFromResourcesFolderToTemp(String resourceFileName,String targetFilePath) throws Exception
+  {
+    ClassLoader classLoader = getClass().getClassLoader();
+    File outFile = new File(targetFilePath);
+    FileUtils.copyInputStreamToFile(classLoader.getResourceAsStream(resourceFileName), outFile);
+  }
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/test/JepPythonTestContext.java b/python/src/test/java/org/apache/apex/malhar/python/test/JepPythonTestContext.java
new file mode 100644
index 0000000000..efa3b7cec5
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/test/JepPythonTestContext.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.test;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation that helps in selectively triggering certain tests if JEP python is available at the time of
+ * launching of the tests
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface JepPythonTestContext
+{
+
+  boolean jepPythonBasedTest() default false;
+
+}
diff --git a/python/src/test/java/org/apache/apex/malhar/python/test/PythonAvailabilityTestRule.java b/python/src/test/java/org/apache/apex/malhar/python/test/PythonAvailabilityTestRule.java
new file mode 100644
index 0000000000..118e2fec7a
--- /dev/null
+++ b/python/src/test/java/org/apache/apex/malhar/python/test/PythonAvailabilityTestRule.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python.test;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.python.base.jep.BaseJEPTest;
+
+/**
+ * A Junit rule that helps in bypassing tests that cannot be done if the Python installation is not present.
+ * The unit tests will be triggered as soon as the switch -DjepInstalled=true is passed from the command line.
+ */
+public class PythonAvailabilityTestRule implements TestRule
+{
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(PythonAvailabilityTestRule.class);
+
+  public static final String JEP_LIBRARY_INSTALLED_SWITCH = "jepInstallPath";
+
+  @Override
+  public Statement apply(Statement base, Description description)
+  {
+    JepPythonTestContext testContext = description.getAnnotation(JepPythonTestContext.class);
+    String jepInstalledStrVal = System.getProperty(JEP_LIBRARY_INSTALLED_SWITCH);
+    boolean jepInstalled = false;
+    if (jepInstalledStrVal != null) {
+      jepInstalled = true;
+      LOG.debug("Using " + jepInstalledStrVal + " as the library path for python interpreter");
+    }
+    boolean runThisTest = true; // default is to run the test if no annotation is specified i.e. python is not required
+    if ( testContext != null) {
+      if ( (testContext.jepPythonBasedTest()) && (jepInstalled) ) {
+        runThisTest = true;
+      } else {
+        runThisTest = false;
+      }
+    }
+    if (runThisTest) {
+      if ( (jepInstalled) && (!BaseJEPTest.JEP_INITIALIZED)) {
+        try {
+          BaseJEPTest.initJEPThread();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return base;
+    } else {
+      // bypass the test altogether
+      return new Statement()
+      {
+        @Override
+        public void evaluate() throws Throwable
+        {
+          // Return an empty Statement object for those tests
+        }
+      };
+    }
+  }
+
+}
diff --git a/python/src/test/resources/META-INF/properties.xml b/python/src/test/resources/META-INF/properties.xml
new file mode 100644
index 0000000000..652f981785
--- /dev/null
+++ b/python/src/test/resources/META-INF/properties.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+<configuration>
+  <property>
+    <name>dt.operator.pythonprocessor.attr.JVM_OPTIONS</name>
+    <value> -Djava.library.path=${jepInstallPath}</value>
+  </property>
+</configuration>
diff --git a/python/src/test/resources/factorial.py b/python/src/test/resources/factorial.py
new file mode 100755
index 0000000000..ca471b8d2a
--- /dev/null
+++ b/python/src/test/resources/factorial.py
@@ -0,0 +1,49 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+
+import sys, getopt, os
+
+os.getcwd()
+outputfile = 'target/factorial-result.txt'
+number = 5
+
+
+def factorial(num):
+    if num <= 0:
+        return 1
+    else:
+        factorial = 1
+        for i in range(1,num + 1):
+            factorial = factorial*i
+        return factorial
+
+def writeFile(factorial):
+    fileHandle  = open(outputfile,'w')
+    fileHandle.write(factorial+'\n')
+    fileHandle.flush()
+    fileHandle.close()
+
+
+if __name__ == "__main__":
+   writeFile(str(factorial(number)))
+
+
diff --git a/python/src/test/resources/log4j.properties b/python/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..ecb366871d
--- /dev/null
+++ b/python/src/test/resources/log4j.properties
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.apex.malhar.python=INFO
+log4j.logger.org.apache.apex.malhar.python.base=INFO
+log4j.logger.org.apache.apex.malhar.python.base.jep=INFO


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


> Operator that can execute python code
> -------------------------------------
>
>                 Key: APEXMALHAR-2260
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2260
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Thomas Weise
>            Assignee: Ananth
>            Priority: Major
>              Labels: roadmap
>
> Support execution of Python code in an operator.
> https://lists.apache.org/thread.html/9837b1dee8f909ed400c6030ce5c6a94a12f43183718019dd0bfd228@%3Cdev.apex.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)