Hi Team
We are stuck and need your help. We are reading a PARQUET file used ParquetAvro Connectors from HDFS
override def streamParquetFileAsAvro(filePath: String): Source[GenericRecord, NotUsed] = {
ugi.doAs(new PrivilegedExceptionAction[Source[GenericRecord, NotUsed]] {
override def run(): Source[GenericRecord, NotUsed] = {
try {
val hadoopPath = new HadoopPath(filePath)
hadoopConfig.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val inputFile = HadoopInputFile.fromPath(hadoopPath, hadoopConfig)
val reader = AvroParquetReader.builder[GenericRecord](inputFile).withConf(hadoopConfig).build()
AvroParquetSource(reader)
} catch {
case ex: Exception =>
handleParquetFileReadError(filePath, ex)
Source.empty[GenericRecord]
}
}
})
}
Now we want to upload it in s3, the upload to s3 file bucket works, but the file gets corrupted during the time of serialization
// Step2: Read the Parquet file as a Source[GenericRecord, NotUsed]
val parquetSource: Source[GenericRecord, NotUsed] = fileHandler.streamParquetFileAsAvro(filePath)
// Step 3: Flow to convert GenericRecord to ByteString
val recordToByteStringFlow: Flow[GenericRecord, ByteString, NotUsed] = Flow[GenericRecord].map(serializeRecord)
val s3key: String = DataSyncUtils.getRelativePath(filePath)
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload("bg0975-cef-ccmedev-data", s3key)
// Step 5: Run the stream
val uploadResult: Future[MultipartUploadResult] = parquetSource
.via(recordToByteStringFlow)
.runWith(s3Sink)
private def serializeRecord(record: GenericRecord): ByteString = {
val writer = new GenericDatumWriter[GenericRecord](record.getSchema)
val outputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null)
writer.write(record, encoder)
encoder.flush()
ByteString(outputStream.toByteArray)
}
I believe the serialization i am doing wrong
Its a very basic use case of uploading Parquet to S3