Can not get variable values inside event sourced entity

Hello

I have an event sourced entity (TokenRefreshBaseEntity). When it receives TokenRefreshServiceCommand, it makes a json call using Http().singleRequest. After the response, it should set theProcessResultNewAccessToken, theProcessResultNewRefreshToken values and persist them with some other variables. Although http requests returns respective values of m.access_token and m.refresh_token succesfully, they cannot be assigned to the variables theProcessResultNewAccessToken and theProcessResultNewRefreshToken within oncomplete of the responseFuture. In this link (Request-Level Client-Side API • Akka HTTP), it says if it is an actor then I should use “.pipeTo(self)” But this is not an actor, it is an entity object called from main like : TokenRefreshBaseEntity.init(system)

Can you help me find why I cannot assign values of var values inside the future?

Regards, Ozcan

private def handleCommand(ServiceHandleCommandEntityIdParam: String, state: State, command: Command): ReplyEffect[Event, State] =
{
log.info(“TokenRefreshAllRefreshedTokensProjectionHandler Pre OK”)

command match
{
  case TokenRefreshServiceCommand(ServiceHandleCommandEntityIdParam,grantType, refreshToken, stateScope, userName, password, clientId, clientSecret, replyTo) =>

    log.info("TokenRefreshAllRefreshedTokensProjectionHandler OK")

    var theProcessResultNewAccessToken = "EMPTY"
    var theProcessResultNewRefreshToken="EMPTY"
    val theProcessResultTokenType="EMPTY"
    val theProcessResultExpiresIn="EMPTY"

    val dataAsJson = "{\n\"tokenData\":{\n\"refresh_token\":\"" + refreshToken +  "\"}\n}"
    println(dataAsJson)

    implicit val formats = DefaultFormats
    case class TokenQueryResult(refresh_token: String, access_token: String)

    val request = HttpRequest(
      method = HttpMethods.POST,
      uri = "http://alpha.website.com/KRSRefreshToken/",
      entity = HttpEntity(
        ContentTypes.`application/json`,
        dataAsJson
      )
    )

    implicit val system = ActorSystem(Behaviors.empty, "SingleRequest")
    implicit val executionContext = system.executionContext
    log.info("request {}", request)

    val responseFuture = Http().singleRequest(request)
      .flatMap(_.entity.toStrict(2.seconds))
      .map(_.data.utf8String)

    responseFuture
      .onComplete {
        case Success(res) =>log.info(s"Title: yields content: ${res}")
          val json = parse(res)
          val m = json.extract[TokenQueryResult]
          theProcessResultNewAccessToken=m.access_token
          println(m.access_token)
          theProcessResultNewRefreshToken=m.refresh_token
          println(m.refresh_token)
        case Failure(_) => log.info(s"Done with title err")
          theProcessResultNewAccessToken="NOTHING"
          theProcessResultNewRefreshToken="NOTHING"
      }

    Effect
        .persist(TokenRefreshServiceDoneEvent(ServiceHandleCommandEntityIdParam, grantType, refreshToken, stateScope, userName, password, clientId, clientSecret, theProcessResultNewAccessToken,theProcessResultTokenType,theProcessResultExpiresIn,theProcessResultNewRefreshToken))
        .thenReply(replyTo)
        {
           someCommandDoneState =>
            StatusReply.Success(TokenRefreshServiceCommandResult(
              someCommandDoneState.ServiceStateEntityId, someCommandDoneState.ServiceStateGrantType ,someCommandDoneState.ServiceStateRefreshToken,someCommandDoneState.ServiceStateScope
              ,someCommandDoneState.ServiceStateUserName,someCommandDoneState.ServiceStatePassword,someCommandDoneState.ServiceStateClientId,someCommandDoneState.ServiceStateClientSecret
              , theProcessResultNewAccessToken,theProcessResultTokenType,theProcessResultExpiresIn,theProcessResultNewRefreshToken))
        }
  case Get(replyTo) =>
    Effect.reply(replyTo)(state.toSummary)
}

}

Hi Ozcan,

the future callback to the lambda you have passed to onComplete will execute on some thread once the future completes.

Akka only gives guarantees that you can safely work with mutable state when it is accessed as a reaction to a message to the actor, so you will either have to make that state immutable, which is safe to interact with across threads, or use pipeToSelf to turn the completed future value into a message that the actor can act on. More about this in the docs here

EventSourced and DurableState “entities” are also actors, so the exact same applies for those.

In general we recommend that you factor out future composition (map/onComplete etc.) into static/companion object methods to not accidentally close over internal actor state like this.

1 Like

Hello
Thank you for the response. Can you give me simple example of factoring out a future composition into static/companion object methods?
Regards, Ă–zcan

I’m afraid I don’t have a good sample at hand.

Make sure that all calls on instances of Futures taking a lambda to execute when the future completes successfully or fails, such as map, flatMap, onComplete are done in a separate method on a top level object like the companion.

That way you will notice if it depends on some value from the actor, if that value is immutable you can pass as a parameter, if it is immutable you will have to trigger a message using pipeTo to be able to interact with the value.

In you sample it likely means that you’d put everything from singleRequest to parsing the response and mapping it to the pair of tokens into a single method that returns a Future containing those two token strings.

If you need those two tokens inside the actor, that final future is what you’d pipeToSelf as a message.

Also note that you should most certainly not create a new ActorSystem instance on each handled command in your actor as that is expensive and will be a resource leak unless you also stop them somewhere, instead use the system that the actor is running in through context.system.

In general a single actor system per JVM is what you want.