I am trying to run the below code but however, it fails to successfully just read from the SQS. The thing am trying to play around is to just have a message in SQS which has my S3 bucket file names to be downloaded. I have the downloading part working individually using Alpakka but the part where I am trying to read from SQS is getting a bit messy. I am completely new to the Reactive programming world so if it sounds very trivial please forgive me, but am looking for some help here! thanks a lot in advance.
P.S: I tried the samples from this github repo: git@github.com:akka/alpakka-samples.git
public class SQSEventHandler {
final static ObjectMapper mapper = new ObjectMapper();
final static ObjectReader fromSqsReader = mapper.readerFor(MessageFromSqs.class);
final static ObjectWriter enrichedMessageWriter = mapper.writerFor(EnrichedMessage.class);
final S3OperationsImpl s3Operations = new S3OperationsImpl();
final ActorSystem system;
final Materializer materializer;
final ActorRef enrichingActor;
final LoggingAdapter log;
public static void main(String[] args) throws Exception {
SQSEventHandler me = new SQSEventHandler();
me.run();
}
public SQSEventHandler() {
system = ActorSystem.create();
log = Logging.getLogger(system, this);
materializer = ActorMaterializer.create(system);
enrichingActor = system.actorOf(Props.create(EnrichActor.class, EnrichActor::new));
}
void run() throws Exception {
// create SQS client
AWSCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY));
AmazonSQSAsync sqsClient =
AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, SIGNING_REGION))
.build();
system.registerOnTermination(() -> sqsClient.shutdown());
// configure SQS
SqsSourceSettings settings = SqsSourceSettings.create().withCloseOnEmptyReceive(false);
SqsAckSettings ackSettings = SqsAckSettings.create();
// create running stream
CompletionStage<Done> streamCompletion = SqsSource.create(sourceQueueUrl, settings, sqsClient)
.log("read from SQS", log)
.mapAsync(8, (Message msg) -> {
MessageFromSqs messageFromSqs = fromSqsReader.readValue(msg.getBody());
//now to download the file from S3 by using the information necessary in the message from SQS
Stream.of(s3Operations.fileDownloader(messageFromSqs.bucketName, messageFromSqs.bucketKey, materializer, system))
//NOT sure what to do here for me to download the file and yet delete the message from the SQS
//need help here!!!
})
.map(msg -> MessageAction.delete(msg)
.runWith(
SqsAckSink.create(sourceQueueUrl, ackSettings, sqsClient),
materializer
);
// terminate the actor system when the stream completes (see withCloseOnEmptyReceive)
streamCompletion.thenAccept(done -> system.terminate());
}
//just for reference pasting the src code for fileDownloader
public String fileDownloader(String bucketName, String bucketKey, Materializer materializer, ActorSystem system) throws InterruptedException, TimeoutException, ExecutionException {
final Source<Optional<Pair<Source<ByteString, NotUsed>, ObjectMetadata>>, NotUsed>
sourceAndMeta = S3.download(bucketName, bucketKey);
final Pair<Source<ByteString, NotUsed>, ObjectMetadata> dataAndMetadata = sourceAndMeta
.runWith(Sink.head(), materializer)
.toCompletableFuture()
.get(15, TimeUnit.SECONDS)
.get();
final Source<ByteString, NotUsed> data = dataAndMetadata.first();
final CompletionStage<String> resultCompletionStage =
data.map(ByteString::utf8String).runWith(Sink.head(), materializer);
return resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);
}
}