I am learning Akka with Java. I have written a simple program with two actors.
My first actor ActorA is called with list containing 1000 strings. ActorA loops through the list and calls ActorB for each element.
ActorB makes a Http POST call to external service using the String parameter received from ActorA .
I am expecting that ActorB will successfully make 1000 Http POST calls and will receive equal number of responses. However ActorB is able to make POST request randomly between 80-120 times then it stops making POST calls.
I tried providing a custom dispatcher as HTTP POST call is a blocking operation but still no luck!!
Refer to code and configuration given below.
public class ActorA extends AbstractActor {
static public Props props() {
return Props.create(ActorA.class);
}
static public class IdWrapper {
List<String> ids;
public IdWrapper(List<String> ids) {
this.ids = ids;
}
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(IdWrapper.class, this::process)
.build();
}
private void process(IdWrapper msg) {
msg.ids.forEach(id -> {
context().actorSelection("actorB").tell(new MessageForB(id), ActorRef.noSender());
}
);
}
}
public class ActorB extends AbstractActor {
final Http http = Http.get(getContext().system());
final Materializer materializer = ActorMaterializer.create(context());
public static Props props() {
return Props.create(ActorB.class);
}
static public class MessageForB implements Serializable {
String id;
public MessageForB(String id) {
this.id = id;
}
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(MessageForB.class, this::process)
.build();
}
private void process(MessageForB MessageForB) {
ExecutionContext ec = getContext().getSystem().dispatchers().lookup("my-blocking-dispatcher");
/**
* Get id from request
*/
String reqId = MessageForB.id;
/**
* Prepare request
*/
XmlRequest requestEntity = getRequest(Stream.of(reqId).collect(Collectors.toList()));
String requestAsString = null;
try {
/**
* Create and configure JAXBMarshaller.
*/
JAXBContext jaxbContext = JAXBContext.newInstance(XmlRequest.class);
Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
jaxbMarshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
/**
* Convert request entity to string before making POST request.
*/
StringWriter sw = new StringWriter();
jaxbMarshaller.marshal(requestEntity, sw);
requestAsString = sw.toString();
} catch (JAXBException e) {
e.printStackTrace();
}
/**
* Create RequestEntity from request string.
*/
RequestEntity entity = HttpEntities.create(
MediaTypes.APPLICATION_XML.toContentType(HttpCharsets.ISO_8859_1),
requestAsString);
/**
* Create Http POST with necessary headers and call
*/
final CompletionStage<HttpResponse> responseFuture =
http.singleRequest(HttpRequest.POST("http://{hostname}:{port}/path")
.withEntity(entity));
responseFuture
.thenCompose(httpResponse -> {
/**
* Convert response into String
**/
final CompletionStage<String> res = Unmarshaller.entityToString().unmarshal
(httpResponse.entity(), ec, materializer);
/**
* Consume response bytes
**/
httpResponse.entity().getDataBytes().runWith(Sink.ignore(), materializer);
return res;
})
.thenAccept(s -> {
try {
/**
* Deserialize string to DTO.
*/
MyResponse MyResponse = getMyResponse(s);
// further processing..
} catch (JAXBException e) {
e.printStackTrace();
}
});
}
private XmlRequest getRequest(List<String> identifiers){
XmlRequest request = new XmlRequest();
// Business logic to create req entity
return request;
}
private MyResponse getMyResponse(String s) throws JAXBException {
JAXBContext jaxbContext = JAXBContext.newInstance
(MyResponse.class);
javax.xml.bind.Unmarshaller jaxbUnmarshaller = jaxbContext
.createUnmarshaller();
StringReader reader = new StringReader(s);
return (MyResponse)
jaxbUnmarshaller.unmarshal(reader);
}
}
my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 5
core-pool-size-max = 20
}
throughput = 1
}
Where can I improve or correct my code so that ActorB will successfully be able to make Http POST calls for all the items sent by ActorA ?