[jira] [Commented] (APEXCORE-810) Concurrent modification exception during connection cleanup in buffer server

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Commented] (APEXCORE-810) Concurrent modification exception during connection cleanup in buffer server

JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/APEXCORE-810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514868#comment-16514868 ]

ASF GitHub Bot commented on APEXCORE-810:
-----------------------------------------

vrozov closed pull request #595: APEXCORE-810 Fixing race condition between publisher and subscriber teardowns
URL: https://github.com/apache/apex-core/pull/595
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 3e8846d1e9..af5db09b6d 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -115,12 +115,7 @@ public void addConnection(WriteOnlyClient connection)
    */
   public void removeChannel(WriteOnlyClient client)
   {
-    for (PhysicalNode pn : physicalNodes) {
-      if (pn.getClient() == client) {
-        physicalNodes.remove(pn);
-        break;
-      }
-    }
+    physicalNodes.removeIf(node -> (node.getClient().equals(client)));
   }
 
   /**
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c5700f2690..6332a18804 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -24,9 +24,6 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -117,9 +114,11 @@ public void registered(SelectionKey key)
   @Override
   public void unregistered(SelectionKey key)
   {
+    logger.debug("Unregistered {}", this);
     for (LogicalNode ln : subscriberGroups.values()) {
       ln.boot();
     }
+    super.unregistered(key);
     /*
      * There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor.
      */
@@ -860,41 +859,32 @@ private void teardown()
       }
       torndown = true;
 
-      /*
-       * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
-       * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
-       * are not being written to, just stick around till the next publisher shows up and eat into
-       * the data it's publishing for the new subscribers.
-       */
-
-      /**
-       * since the publisher server died, the queue which it was using would stop pumping the data unless
-       * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
-       * with the same identifier as the one which just died.
-       */
-      if (publisherChannels.containsValue(this)) {
-        final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator();
-        while (i.hasNext()) {
-          if (i.next().getValue() == this) {
-            i.remove();
-            break;
-          }
-        }
-      }
-
-      ArrayList<LogicalNode> list = new ArrayList<>();
-      String publisherIdentifier = datalist.getIdentifier();
-      Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
-      while (iterator.hasNext()) {
-        LogicalNode ln = iterator.next();
-        if (publisherIdentifier.equals(ln.getUpstream())) {
-          list.add(ln);
+      serverHelperExecutor.submit(() ->
+      {
+        /*
+         * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
+         * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
+         * are not being written to, just stick around till the next publisher shows up and eat into
+         * the data it's publishing for the new subscribers.
+         */
+
+        /**
+         * since the publisher server died, the queue which it was using would stop pumping the data unless
+         * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
+         * with the same identifier as the one which just died.
+         */
+        String publisherIdentifier = datalist.getIdentifier();
+        if (!publisherChannels.remove(publisherIdentifier, Publisher.this)) {
+          logger.warn("{} could not be removed from channels", Publisher.this);
         }
-      }
 
-      for (LogicalNode ln : list) {
-        ln.boot();
-      }
+        subscriberGroups.forEach((type, ln) ->  {
+          if (publisherIdentifier.equals(ln.getUpstream())) {
+            logger.debug("Booting logical node {} from publisher", ln);
+            ln.boot();
+          }
+        });
+      });
     }
 
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


> Concurrent modification exception during connection cleanup in buffer server
> ----------------------------------------------------------------------------
>
>                 Key: APEXCORE-810
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-810
>             Project: Apache Apex Core
>          Issue Type: Bug
>            Reporter: Pramod Immaneni
>            Assignee: Pramod Immaneni
>            Priority: Minor
>
> {noformat}
> ERROR com.datatorrent.bufferserver.server.Server: Buffer server Server@56cfec7c\{address=/0:0:0:0:0:0:0:0:45081} failed to tear down subscriber Subscriber@2ff22212{ln=LogicalNode@36d87f9eidentifier=tcp://xxxxxx:45081/2.output.1, upstream=2.output.1, group=rand_console/3.input, partitions=[], iterator=com.datatorrent.bufferserver.internal.DataList$DataListIterator@6caeed6a{da=com.datatorrent.bufferserver.internal.DataList$Block@506501e4{identifier=2.output.1, data=16777216, readingOffset=0, writingOffset=1822, starting_window=59dc9c3000000001, ending_window=59dc9c3000000055, refCount=2, uniqueIdentifier=0, next=null, future=null}}}}.java.util.ConcurrentModificationException
> at java.util.HashMap$HashIterator.nextEntry(HashMap.java:922)
> at java.util.HashMap$KeyIterator.next(HashMap.java:956)
> at com.datatorrent.bufferserver.internal.LogicalNode.removeChannel(LogicalNode.java:118)
> at com.datatorrent.bufferserver.server.Server$3.run(Server.java:410)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)