Porting over to akka persistence

In my codebase, I have a pattern where I am using a cluster singleton using ClusterSingletonManagerSettings.

All my actors currently extend UntypedActor and I would like to port all the actors to persistence. On treading the sample docs, is this the general right strategy to move forward?

  1. All actors should extend AbstractPersistentActor
  2. I need to implement public String persistenceId()
  3. I need to implement Event Sourcing where I create classes where the state of an actor can be serialized to something (either to a file or some plug-in)
  4. Code has to be refactored for createReceive() where when an actor receives a message, the state is immediately persisted
  5. If the system goes down, then the actor replays messages from a file (or however the data was saved) to restore the state of the actor.

I recently inherited an akka project, so i apologize for asking dumb questions.

Sounds about right.

In general “all actors should extend AbstractPersistentActor” sounds a little bit dangerous, you should determine case by case if it makes sense to have the actor persistent and then also if event sourcing really is a good fit (in some cases some other form of storing state may make more sense, just writing a snapshot of the state every now and then and loading that on startup for example, or using Distributed Data to ensure the state is shared across nodes if one crashes)

Thanks again for your comments, here is a little more context. There currently is a MasterActor (just an actor that has one instance) that has in memory data structures (hashmaps) that help with some business logic launched in the cluster singleton. When AKKA goes down, those hashmaps get blown out.

**Option 1: Event Sourcing
There is a “MasterActor” that extends AbstractPersistentActor which uses event sourcing. When messages are replayed that hashmap is restored

**Option 2: Snapshot
Intermittently write a snapshot of the state now and then

**Option 3: Instead of persisting to a journal, serialize all of the objects out. On recovery, load the serialized file

**Option 4: Use distributed data to ensure the state is shared across the nodes

Couple questions:

  1. Give that I have several hashmaps which don’t exactly fit the event sourcing paradigm I was thinking of writing out to a file the internal hashmaps as a serialized file. When Master restarts, it can load createReceiveRecover which will load the serialized hashmap from the file. Would this be a viable solution
  2. Is it possible to remove messages from the journal which have been persisted? The system has a concept where messages have different states and replaying messages which have been processed would lead to duplication.

Thanks!

  1. If you’re going to manage the state through your own mechanism, why bother with implying you’re using event sourcing by subclassing AbstractPersistentActor? Load the serialized hashmap in the preStart() method.

  2. Yes, you can remove messages from the journal. The canonical example is deleting past events on a SaveSnapshotSuccess message.

Thanks for the comment, I was able to serialize this class out (Work gets passed into onMessage()

public static final class Work implements Serializable {

        private static final long serialVersionUID = 1L

        private final String workId;

        private final ActorRef originalSender;
}

And I am able to serialize it out with the ordinary serializer

       FileOutputStream fileOutputStream = new FileOutputStream(filePath);
	      ObjectOutputStream objectOutputStream  = new ObjectOutputStream(fileOutputStream);
	      objectOutputStream.writeObject(map); //map of work objects
	      objectOutputStream.flush();
	      objectOutputStream.close();

The problem is when I try to unserialize it in preStart()

public void preStart() {
              //initialization code
              //unserialize code
	      LinkedHashMap<String,Work> lhm = (LinkedHashMap<String,Work>) 

I get

 Trying to deserialize a serialized ActorRef without an ActorSystem in scope. Use 
akka.serialization.Serialization.currentSystem.withValue(system) { ... }
  1. After reading the docs, and doing some testing, this is the correct way of manually serializing and unserializing an actorRef?
 /*Check to see the default value of the actor */
        Serialization serialization = SerializationExtension.get(system);

    	/* This will serialize out to a string */
        String identifier = Serialization.serializedActorPath(persistentActor);
        
        /* This will restore the ref properly */
        final ActorRef deserializedActorRef = system.provider().resolveActorRef(identifier);
       
  1. To get access to the current ActorSystem in preStart() is it correct to call context()
	String testIdentifier = "akka://example/user/persistentActor-4-java8#-173846886";
        final ActorRef deserializedActorRef = context().provider().resolveActorRef(testIdentifier);
  1. In reading the docs, there is an alternative way to serialize/deserialize by calling code in this block
    https://doc.akka.io/japi/akka/current/akka/serialization/JavaSerializer.html#currentSystem--
JavaSerializer.currentSystem.withValue(system) {

 }

The problem is when I do
akka.serialization.Serialization.currentSystem.withValue(system)
or JavaSerializer.currentSystem

none of these api options show in the code?

I appreciate in advance all the help everyone has contributed!

You didn’t mention you were trying to persist ActorRefs. Serialized actor references don’t do what you expect them to – they’re just references to the actor cell containing the actor, and don’t contain any information about the actor itself.

If, when you’re recovering, the senders will have already been created by some other process, then instead of serializing the ActorRef, use toString() on the ref to get the path, persist the path, and rebuild the actor ref with system.actorFor(path).

If the senders have not been created by some other process, you’ll have to create new instances of the actors. Unless you know more about them than you’re showing here, there’s not enough information to do that.

Thanks Rob, so to confirm, this technique doesn’t work below?

JavaSerializer.currentSystem.withValue(system) { }

(I can’t mange to wrap my head around how this works using regular serialization)