How does alpaka and pub sub work?
This topic is based on a conversation between an anonymous user of Akka and our “Ask AI about Akka” bot in the documentation.
How does alpaka and pub sub work?
This topic is based on a conversation between an anonymous user of Akka and our “Ask AI about Akka” bot in the documentation.
Alpakka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala. It provides connectors for various technologies, including Google Cloud Pub/Sub.
Google Cloud Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers. It allows for secure and highly available communication between independently written applications.
Alpakka provides two connectors for Google Cloud Pub/Sub:
Here is an example of how to use it in Scala:
implicit val system: ActorSystem = ActorSystem()
val config = PubSubConfig()
val topic = "topic1"
val subscription = "subscription1"
And in Java:
ActorSystem system = ActorSystem.create();
PubSubConfig config = PubSubConfig.create();
String topic = "topic1";
String subscription = "subscription1";
Here is an example of how to use it in Scala:
val projectId = "alpakka"
val topic = "simpleTopic"
val publishMessage: PubsubMessage =
PubsubMessage()
.withData(ByteString.copyFromUtf8("Hello world!"))
val publishRequest: PublishRequest =
PublishRequest()
.withTopic(s"projects/$projectId/topics/$topic")
.addMessages(publishMessage)
val source: Source[PublishRequest, NotUsed] =
Source.single(publishRequest)
val publishFlow: Flow[PublishRequest, PublishResponse, NotUsed] =
GooglePubSub.publish(parallelism = 1)
val publishedMessageIds: Future[Seq[PublishResponse]] = source.via(publishFlow).runWith(Sink.seq)
And in Java:
final String projectId = "alpakka";
final String topic = "simpleTopic";
final PubsubMessage publishMessage =
PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("Hello world!")).build();
final PublishRequest publishRequest =
PublishRequest.newBuilder()
.setTopic("projects/" + projectId + "/topics/" + topic)
.addMessages(publishMessage)
.build();
final Source<PublishRequest, NotUsed> source = Source.single(publishRequest);
To run the example code, you will need to configure a project and pub/sub in Google Cloud and provide your own credentials.
How does grpc and http differ?
gRPC and HTTP are both protocols used for communication between services in a distributed system. However, they differ in several ways (source):
I want to subscribe message allowing high performance and source control and backpresure.
To subscribe to messages with high performance and source control, you can use Akka Streams with backpressure. Akka Streams is a library that is designed to process and transfer a sequence of elements using bounded buffer space. It implements backpressure, which is a form of flow control to ensure that fast producers do not overwhelm slow consumers.
Here is an example of how to use Akka Streams with backpressure in Scala, using Alpakka’s MQTT connector:
val mqttSource: Source[MqttMessage, Future[Done]] =
MqttSource.atMostOnce(
connectionSettings.withClientId(clientId = "source-spec/source"),
MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)),
bufferSize = 8
)
val (subscribed, streamResult) = mqttSource
.take(messages.size)
.toMat(Sink.seq)(Keep.both)
.run()
In this example, MqttSource.atMostOnce
creates a source that connects to the MQTT server and receives messages from the subscribed topics. The bufferSize
sets the maximum number of messages read from MQTT before back-pressure applies. This means that if the source produces messages faster than they can be consumed, it will start to slow down once the buffer is full, preventing the consumer from being overwhelmed (source).
For Google Cloud Pub/Sub, you can use Alpakka’s Google Cloud Pub/Sub connector:
val subscriptionSource: Source[ReceivedMessage, Cancellable] =
GooglePubSub.subscribe(subscription, config)
val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
GooglePubSub.acknowledge(subscription, config)
subscriptionSource
.map { message =>
// do something fun
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
In this example, GooglePubSub.subscribe
creates a source that subscribes to a Pub/Sub topic and receives messages. The ackSink
is a sink that acknowledges the received messages, signaling that they have been processed and can be removed from the Pub/Sub system. The groupedWithin
operator is used to batch the acknowledgements, improving performance (source).
Remember that to run these examples, you will need to configure your MQTT server or Google Cloud Pub/Sub and provide your own credentials.
I need the code in java.
Sure, here is an example of how to use Akka Streams with backpressure in Java, using Alpakka’s Google Cloud Pub/Sub connector:
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import akka.stream.alpakka.googlecloud.pubsub.javadsl.GooglePubSub;
import akka.stream.alpakka.googlecloud.pubsub.AcknowledgeRequest;
import akka.stream.alpakka.googlecloud.pubsub.ReceivedMessage;
import java.util.concurrent.CompletionStage;
ActorSystem system = ActorSystem.create();
PubSubConfig config = PubSubConfig.create();
String subscription = "subscription1";
Source<ReceivedMessage, NotUsed> subscriptionSource =
GooglePubSub.subscribe(subscription, config, system);
Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink =
GooglePubSub.acknowledge(subscription, config, system);
subscriptionSource
.map(message -> {
// do something with the message
return message.ackId();
})
.groupedWithin(1000, Duration.ofMinutes(1))
.map(AcknowledgeRequest::create)
.to(ackSink);
In this example, GooglePubSub.subscribe
creates a source that subscribes to a Pub/Sub topic and receives messages. The ackSink
is a sink that acknowledges the received messages, signaling that they have been processed and can be removed from the Pub/Sub system. The groupedWithin
operator is used to batch the acknowledgements, improving performance (source).
Remember that to run this example, you will need to configure your Google Cloud Pub/Sub and provide your own credentials.
Where is the resource buffer and back pressure handled and restart?
In Akka, resource buffering and backpressure are handled in the Akka Streams library. Akka Streams implement an asynchronous non-blocking back-pressure protocol standardized by the Reactive Streams specification. The user of the library does not have to write any explicit back-pressure handling code — it is built in and dealt with automatically by all of the provided Akka Streams operators (source).
Here is an example of how to use Akka Streams with backpressure in Java:
tweets
.buffer(10, OverflowStrategy.dropHead())
.map(t -> slowComputation(t))
.runWith(Sink.ignore(), system);
In this example, the buffer
method is used to handle buffering with a buffer size of 10 elements. The OverflowStrategy.dropHead()
strategy is used for backpressure, which means that if the buffer is full and a new element arrives, the oldest element in the buffer will be dropped (source).
As for restarts, they are handled in Akka’s fault tolerance model. When an actor fails (throws an exception), it is suspended and its supervisor is informed of the failure. The supervisor then decides what to do, which could be to restart the actor. Here is an example of how to handle restarts in Java:
return Behaviors.receive(String.class)
.onMessage(
String.class,
msg -> {
// message handling that might throw an exception
String[] parts = msg.split(" ");
resource.process(parts);
return Behaviors.same();
})
.onSignal(
PreRestart.class,
signal -> {
resource.close();
return Behaviors.same();
})
.onSignal(
PostStop.class,
signal -> {
resource.close();
return Behaviors.same();
})
.build();
}))
.onFailure(Exception.class, SupervisorStrategy.restart());
In this example, the SupervisorStrategy.restart()
strategy is used, which means that if the actor throws an exception, it will be restarted (source).