Alpakka S3 connectors for Parquet files

Hi Team

We are stuck and need your help. We are reading a PARQUET file used ParquetAvro Connectors from HDFS

override def streamParquetFileAsAvro(filePath: String): Source[GenericRecord, NotUsed] = {
    ugi.doAs(new PrivilegedExceptionAction[Source[GenericRecord, NotUsed]] {
      override def run(): Source[GenericRecord, NotUsed] = {
        try {
          val hadoopPath = new HadoopPath(filePath)
          hadoopConfig.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
          val inputFile = HadoopInputFile.fromPath(hadoopPath, hadoopConfig)
          val reader = AvroParquetReader.builder[GenericRecord](inputFile).withConf(hadoopConfig).build()
          AvroParquetSource(reader)
        } catch {
          case ex: Exception =>
            handleParquetFileReadError(filePath, ex)
            Source.empty[GenericRecord]
        }
      }
    })
  }

Now we want to upload it in s3, the upload to s3 file bucket works, but the file gets corrupted during the time of serialization

// Step2: Read the Parquet file as a Source[GenericRecord, NotUsed]
    val parquetSource: Source[GenericRecord, NotUsed] = fileHandler.streamParquetFileAsAvro(filePath)

    // Step 3: Flow to convert GenericRecord to ByteString
    val recordToByteStringFlow: Flow[GenericRecord, ByteString, NotUsed] = Flow[GenericRecord].map(serializeRecord)

    val s3key: String = DataSyncUtils.getRelativePath(filePath)
 val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload("bg0975-cef-ccmedev-data", s3key)

    // Step 5: Run the stream
    val uploadResult: Future[MultipartUploadResult] = parquetSource
      .via(recordToByteStringFlow)
      .runWith(s3Sink)

private def serializeRecord(record: GenericRecord): ByteString = {
    val writer = new GenericDatumWriter[GenericRecord](record.getSchema)
    val outputStream = new ByteArrayOutputStream()
    val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null)
    writer.write(record, encoder)
    encoder.flush()
    ByteString(outputStream.toByteArray)
  }

I believe the serialization i am doing wrong

Its a very basic use case of uploading Parquet to S3