How to interact with Akka Actors via Akka HTTP (Java)












0















Topic



I would like to interact with an Akka Actor via Akka HTTP. The idea is to have a system where a HTTP client calls an Akka HTTP server method, which handles the request to an Akka Actor. The actor processes the message and responds back to the caller (Akka HTTP), which answers to the HTTP client.
I managed to do as described above, but I think I am not doing it correctly since my implementation seems blocking.



I explain better: if I make many concurrent HTTP requests, I see that Akka HTTP "makes a queue", thus waiting for the actor to process a request before sending it the following.



What I would like to obtain instead is that the Akka HTTP server forwards the requests coming from HTTP clients immediately to the target akka actor, without waiting for the actor to end the elaboration.
I would like to use the actor mailbox-capacity parameter to determine how big is the message queue and reject messages if they are too many.



Thus I would need a way to have Akka HTTP wait asynchrounsly for the actor response.



I know that the mailbox-capacity is working correctly because If I instead make many requests to my actor using a simple actor2.tell("Prova1", system.deadLetters()) (just for testing), requests exceeding the mailbox size are correctly rejected.






References



In order test my system I created a simple configuration following the minimal examples provided by akka documentation.
This for akka http:
https://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example



and the following for creating my actor:
https://doc.akka.io/docs/akka/current/actors.html#creating-actors






My code



The first thing I did was to create a system with one actor (actor1) configure akka HTTP as follows:



public class TestActor {

private static ActorSystem system;

public static void main(String args) throws InterruptedException
{
String httpBindAddress = "0.0.0.0";
int httpPort = 8086;
system = ActorSystem.create("deupnp");
ActorMaterializer materializer = ActorMaterializer.create(system);
Http http = Http.get(system);
AllDirectives app = new AllDirectives() {
};

Route routeActor = app.get(() ->
app.pathPrefix("mysuburl", () ->
app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor ->
app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message ->
app.onSuccess(() ->
CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response ->
app.complete(StatusCodes.get(200), response))))));

Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);

// create system with one actor
ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");
}

private static String actorFunctionCall(String actor, String message)
{
try {
Inbox inbox = Inbox.create(system);
system.actorSelection("user/"+actor).tell(message, inbox.getRef());
String response = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
return response;
} catch (Exception e) {
//return new ResponseMessage(204,"Error");
e.printStackTrace();
return null;
}
}
}


where my ActorTest is the following:



public class ActorTest extends AbstractActor {

private String myName = "";

public ActorTest(String nome){
this.myName = nome;
}

@Override
public void preStart()
{
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class,
message -> {
Thread.sleep(5000l);
System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
})
.matchAny(mex->{
System.out.println("Error");
})
.build();
}
}


my application.conf is very simple:



akka
{
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
actor {
default-dispatcher {
throughput = 10
}
}
}

my-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 1
}





Expected results



As you can see, with mailbox-capacity = 1 I would expect that,
if I make more than 1 concurrent requests, only one is processed and the rest is discarded.



I think the code above is not correct for what I want to obtain, since I am using Akka HTTP routing in order to receive HTTP requests on http://127.0.0.1/mysuburl/actor1/my_msg and then use Inbox to send message to the actor and wait for the response.



So my question is: which is the correct way to link my Akka HTTP request to my Akka Actor actor 1 in an asynchronous way?



Please let me know if you need further details.



Note



I even read the following article:
https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html



which explains how to create a finite number of threads in order to deal with multiple blocking requests, but I think this only "mitigates" the effects of my code, which is blocking, but must be written in a way which is not blocking.










share|improve this question



























    0















    Topic



    I would like to interact with an Akka Actor via Akka HTTP. The idea is to have a system where a HTTP client calls an Akka HTTP server method, which handles the request to an Akka Actor. The actor processes the message and responds back to the caller (Akka HTTP), which answers to the HTTP client.
    I managed to do as described above, but I think I am not doing it correctly since my implementation seems blocking.



    I explain better: if I make many concurrent HTTP requests, I see that Akka HTTP "makes a queue", thus waiting for the actor to process a request before sending it the following.



    What I would like to obtain instead is that the Akka HTTP server forwards the requests coming from HTTP clients immediately to the target akka actor, without waiting for the actor to end the elaboration.
    I would like to use the actor mailbox-capacity parameter to determine how big is the message queue and reject messages if they are too many.



    Thus I would need a way to have Akka HTTP wait asynchrounsly for the actor response.



    I know that the mailbox-capacity is working correctly because If I instead make many requests to my actor using a simple actor2.tell("Prova1", system.deadLetters()) (just for testing), requests exceeding the mailbox size are correctly rejected.






    References



    In order test my system I created a simple configuration following the minimal examples provided by akka documentation.
    This for akka http:
    https://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example



    and the following for creating my actor:
    https://doc.akka.io/docs/akka/current/actors.html#creating-actors






    My code



    The first thing I did was to create a system with one actor (actor1) configure akka HTTP as follows:



    public class TestActor {

    private static ActorSystem system;

    public static void main(String args) throws InterruptedException
    {
    String httpBindAddress = "0.0.0.0";
    int httpPort = 8086;
    system = ActorSystem.create("deupnp");
    ActorMaterializer materializer = ActorMaterializer.create(system);
    Http http = Http.get(system);
    AllDirectives app = new AllDirectives() {
    };

    Route routeActor = app.get(() ->
    app.pathPrefix("mysuburl", () ->
    app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor ->
    app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message ->
    app.onSuccess(() ->
    CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response ->
    app.complete(StatusCodes.get(200), response))))));

    Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
    CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);

    // create system with one actor
    ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");
    }

    private static String actorFunctionCall(String actor, String message)
    {
    try {
    Inbox inbox = Inbox.create(system);
    system.actorSelection("user/"+actor).tell(message, inbox.getRef());
    String response = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
    return response;
    } catch (Exception e) {
    //return new ResponseMessage(204,"Error");
    e.printStackTrace();
    return null;
    }
    }
    }


    where my ActorTest is the following:



    public class ActorTest extends AbstractActor {

    private String myName = "";

    public ActorTest(String nome){
    this.myName = nome;
    }

    @Override
    public void preStart()
    {
    }

    @Override
    public Receive createReceive() {
    return receiveBuilder()
    .match(String.class,
    message -> {
    Thread.sleep(5000l);
    System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
    })
    .matchAny(mex->{
    System.out.println("Error");
    })
    .build();
    }
    }


    my application.conf is very simple:



    akka
    {
    stdout-loglevel = "DEBUG"
    loglevel = "DEBUG"
    actor {
    default-dispatcher {
    throughput = 10
    }
    }
    }

    my-mailbox {
    mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
    mailbox-capacity = 1
    }





    Expected results



    As you can see, with mailbox-capacity = 1 I would expect that,
    if I make more than 1 concurrent requests, only one is processed and the rest is discarded.



    I think the code above is not correct for what I want to obtain, since I am using Akka HTTP routing in order to receive HTTP requests on http://127.0.0.1/mysuburl/actor1/my_msg and then use Inbox to send message to the actor and wait for the response.



    So my question is: which is the correct way to link my Akka HTTP request to my Akka Actor actor 1 in an asynchronous way?



    Please let me know if you need further details.



    Note



    I even read the following article:
    https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html



    which explains how to create a finite number of threads in order to deal with multiple blocking requests, but I think this only "mitigates" the effects of my code, which is blocking, but must be written in a way which is not blocking.










    share|improve this question

























      0












      0








      0








      Topic



      I would like to interact with an Akka Actor via Akka HTTP. The idea is to have a system where a HTTP client calls an Akka HTTP server method, which handles the request to an Akka Actor. The actor processes the message and responds back to the caller (Akka HTTP), which answers to the HTTP client.
      I managed to do as described above, but I think I am not doing it correctly since my implementation seems blocking.



      I explain better: if I make many concurrent HTTP requests, I see that Akka HTTP "makes a queue", thus waiting for the actor to process a request before sending it the following.



      What I would like to obtain instead is that the Akka HTTP server forwards the requests coming from HTTP clients immediately to the target akka actor, without waiting for the actor to end the elaboration.
      I would like to use the actor mailbox-capacity parameter to determine how big is the message queue and reject messages if they are too many.



      Thus I would need a way to have Akka HTTP wait asynchrounsly for the actor response.



      I know that the mailbox-capacity is working correctly because If I instead make many requests to my actor using a simple actor2.tell("Prova1", system.deadLetters()) (just for testing), requests exceeding the mailbox size are correctly rejected.






      References



      In order test my system I created a simple configuration following the minimal examples provided by akka documentation.
      This for akka http:
      https://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example



      and the following for creating my actor:
      https://doc.akka.io/docs/akka/current/actors.html#creating-actors






      My code



      The first thing I did was to create a system with one actor (actor1) configure akka HTTP as follows:



      public class TestActor {

      private static ActorSystem system;

      public static void main(String args) throws InterruptedException
      {
      String httpBindAddress = "0.0.0.0";
      int httpPort = 8086;
      system = ActorSystem.create("deupnp");
      ActorMaterializer materializer = ActorMaterializer.create(system);
      Http http = Http.get(system);
      AllDirectives app = new AllDirectives() {
      };

      Route routeActor = app.get(() ->
      app.pathPrefix("mysuburl", () ->
      app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor ->
      app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message ->
      app.onSuccess(() ->
      CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response ->
      app.complete(StatusCodes.get(200), response))))));

      Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
      CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);

      // create system with one actor
      ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");
      }

      private static String actorFunctionCall(String actor, String message)
      {
      try {
      Inbox inbox = Inbox.create(system);
      system.actorSelection("user/"+actor).tell(message, inbox.getRef());
      String response = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
      return response;
      } catch (Exception e) {
      //return new ResponseMessage(204,"Error");
      e.printStackTrace();
      return null;
      }
      }
      }


      where my ActorTest is the following:



      public class ActorTest extends AbstractActor {

      private String myName = "";

      public ActorTest(String nome){
      this.myName = nome;
      }

      @Override
      public void preStart()
      {
      }

      @Override
      public Receive createReceive() {
      return receiveBuilder()
      .match(String.class,
      message -> {
      Thread.sleep(5000l);
      System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
      })
      .matchAny(mex->{
      System.out.println("Error");
      })
      .build();
      }
      }


      my application.conf is very simple:



      akka
      {
      stdout-loglevel = "DEBUG"
      loglevel = "DEBUG"
      actor {
      default-dispatcher {
      throughput = 10
      }
      }
      }

      my-mailbox {
      mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
      mailbox-capacity = 1
      }





      Expected results



      As you can see, with mailbox-capacity = 1 I would expect that,
      if I make more than 1 concurrent requests, only one is processed and the rest is discarded.



      I think the code above is not correct for what I want to obtain, since I am using Akka HTTP routing in order to receive HTTP requests on http://127.0.0.1/mysuburl/actor1/my_msg and then use Inbox to send message to the actor and wait for the response.



      So my question is: which is the correct way to link my Akka HTTP request to my Akka Actor actor 1 in an asynchronous way?



      Please let me know if you need further details.



      Note



      I even read the following article:
      https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html



      which explains how to create a finite number of threads in order to deal with multiple blocking requests, but I think this only "mitigates" the effects of my code, which is blocking, but must be written in a way which is not blocking.










      share|improve this question














      Topic



      I would like to interact with an Akka Actor via Akka HTTP. The idea is to have a system where a HTTP client calls an Akka HTTP server method, which handles the request to an Akka Actor. The actor processes the message and responds back to the caller (Akka HTTP), which answers to the HTTP client.
      I managed to do as described above, but I think I am not doing it correctly since my implementation seems blocking.



      I explain better: if I make many concurrent HTTP requests, I see that Akka HTTP "makes a queue", thus waiting for the actor to process a request before sending it the following.



      What I would like to obtain instead is that the Akka HTTP server forwards the requests coming from HTTP clients immediately to the target akka actor, without waiting for the actor to end the elaboration.
      I would like to use the actor mailbox-capacity parameter to determine how big is the message queue and reject messages if they are too many.



      Thus I would need a way to have Akka HTTP wait asynchrounsly for the actor response.



      I know that the mailbox-capacity is working correctly because If I instead make many requests to my actor using a simple actor2.tell("Prova1", system.deadLetters()) (just for testing), requests exceeding the mailbox size are correctly rejected.






      References



      In order test my system I created a simple configuration following the minimal examples provided by akka documentation.
      This for akka http:
      https://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example



      and the following for creating my actor:
      https://doc.akka.io/docs/akka/current/actors.html#creating-actors






      My code



      The first thing I did was to create a system with one actor (actor1) configure akka HTTP as follows:



      public class TestActor {

      private static ActorSystem system;

      public static void main(String args) throws InterruptedException
      {
      String httpBindAddress = "0.0.0.0";
      int httpPort = 8086;
      system = ActorSystem.create("deupnp");
      ActorMaterializer materializer = ActorMaterializer.create(system);
      Http http = Http.get(system);
      AllDirectives app = new AllDirectives() {
      };

      Route routeActor = app.get(() ->
      app.pathPrefix("mysuburl", () ->
      app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor ->
      app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message ->
      app.onSuccess(() ->
      CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response ->
      app.complete(StatusCodes.get(200), response))))));

      Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
      CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);

      // create system with one actor
      ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");
      }

      private static String actorFunctionCall(String actor, String message)
      {
      try {
      Inbox inbox = Inbox.create(system);
      system.actorSelection("user/"+actor).tell(message, inbox.getRef());
      String response = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
      return response;
      } catch (Exception e) {
      //return new ResponseMessage(204,"Error");
      e.printStackTrace();
      return null;
      }
      }
      }


      where my ActorTest is the following:



      public class ActorTest extends AbstractActor {

      private String myName = "";

      public ActorTest(String nome){
      this.myName = nome;
      }

      @Override
      public void preStart()
      {
      }

      @Override
      public Receive createReceive() {
      return receiveBuilder()
      .match(String.class,
      message -> {
      Thread.sleep(5000l);
      System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
      })
      .matchAny(mex->{
      System.out.println("Error");
      })
      .build();
      }
      }


      my application.conf is very simple:



      akka
      {
      stdout-loglevel = "DEBUG"
      loglevel = "DEBUG"
      actor {
      default-dispatcher {
      throughput = 10
      }
      }
      }

      my-mailbox {
      mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
      mailbox-capacity = 1
      }





      Expected results



      As you can see, with mailbox-capacity = 1 I would expect that,
      if I make more than 1 concurrent requests, only one is processed and the rest is discarded.



      I think the code above is not correct for what I want to obtain, since I am using Akka HTTP routing in order to receive HTTP requests on http://127.0.0.1/mysuburl/actor1/my_msg and then use Inbox to send message to the actor and wait for the response.



      So my question is: which is the correct way to link my Akka HTTP request to my Akka Actor actor 1 in an asynchronous way?



      Please let me know if you need further details.



      Note



      I even read the following article:
      https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html



      which explains how to create a finite number of threads in order to deal with multiple blocking requests, but I think this only "mitigates" the effects of my code, which is blocking, but must be written in a way which is not blocking.







      akka-http akka-actor






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Jan 19 at 13:58









      gc.mntgc.mnt

      537




      537
























          0






          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54267836%2fhow-to-interact-with-akka-actors-via-akka-http-java%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54267836%2fhow-to-interact-with-akka-actors-via-akka-http-java%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          How fix org.hibernate.TransientPropertyValueException

          Updating UILabel text programmatically using a function

          Cloud Functions - OpenCV Videocapture Read method fails for larger files from cloud storage