Simple UDP Server with Akka Streams and Alpakka UDP

I’m very new to Akka (and functional programming with operators like map, etc.) and am struggling with simply printing out the UDP datagrams received from a UDP flow. I’ve looked at the few UDP examples out there but I think the part I need help with is that I’m dealing with a Flow of Datagrams when I bind to the UDP port and am having trouble casting them to a String that I can print because the UDP connection is a Flow and not simply a Source. So I’m trying to hook the Flow up to a println Sink but am obviously missing something important.

public class UDPServer {
   
     
  public static void main(String[] argv) {
     
	//Code here
	 System.out.println("Running main method...");
	 final ActorSystem system = ActorSystem.create("udp-server");
     final ActorMaterializer mat = ActorMaterializer.create(system);
 
	
    final InetSocketAddress socket = new InetSocketAddress("127.0.0.1", 8888);
    final Flow<Datagram,Datagram,CompletionStage<InetSocketAddress>> udpBindFlow = Udp.bindFlow(socket,system);
	System.out.println("Server bound to Port 8888");
	
	
	//connect my flow to a sink here
	
	// udpBindFlow.to(Sink.foreach(System.out::println));  //runs but doesn't do anything
	udpBindFlow.to(Sink.foreach(dg -> System.out.println(dg.toString()))); //also runs but doesn't output anything
	
	 //do something else with my datagrams
	 
  } //end main
   
   
 
}  //end UDPServer

After some thought and trial and error, I finally figured it out. Posting the answer because I think it might be helpful to others since Alpakka UDP doesn’t have many examples. The trick is to use Source.maybe() to mute the inlet of the UDP flow if you’re not doing outbound UDP from your server. I learned that trick from some of the discussions on WebSockets.

public class UDPStreamServer {
   
     
  public static void main(String[] argv) {
     
	 System.out.println("Running main method...");
	 final ActorSystem system = ActorSystem.create("udp-server");
     final ActorMaterializer materializer = ActorMaterializer.create(system);
 
	
    final InetSocketAddress socket = new InetSocketAddress("127.0.0.1", 8888);
    
	//create my source - for UDP, it's a flow with input (the incoming UDP datagrams from source IP) and output (the outgoing UDP datagrams to the source IP)
		
	final Flow<Datagram,Datagram,CompletionStage<InetSocketAddress>> bindFlow = Udp.bindFlow(socket,system);  //returns the UDP flow 
	
	// create a source for the inlet of the UDP flow
	
	//could create a source of datagrams to send out using the flow inlet, but in this case not use so mute the source
		
	final Source ignoresource = Source.maybe(); //mute the source since its not needed to run our flow
	
	//create a sink for the outlet of UDP flow; using println here about could also be a Kafka or Pulsar sink from Alpakka
	final Sink printsink = Sink.foreach(dg -> System.out.println("Data received: " + dg)); //this prints a datagram
	
	final RunnableGraph<NotUsed> runnable = ignoresource.via(bindFlow).map(dg -> dg.toString()).to(printsink);
	
	runnable.run(materializer); 
		 
	/* example of sending UDP works great
	
	final Integer messagesToSend = 100;
	
	Source.range(1,messagesToSend)
		.map(i -> ByteString.fromString("Message #" + i))
		.map (bs -> Datagram.create(bs, socket))
		.runWith(Udp.sendSink(system),materializer);
	
	System.out.println("Sending datagrams to socket...");
	
	*/
	 
  } //end main
   
    
}  //end UDPServer
1 Like

Thanks for posting what you’ve learned, @dlmutart! I edited your posts to improve the formatting of the source code. Here’s a guide to the formatting syntax for this forum (similar to GitHub): https://commonmark.org/help/

1 Like