Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import za.co.absa.cobrix.cobol.processor.{CobolProcessor, RawRecordProcessor}
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream}

import java.io.OutputStream

Expand All @@ -47,12 +47,25 @@ class CobolProcessorInPlace(readerParameters: ReaderParameters,
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream, None)

try {
StreamProcessor.processStreamInPlace(copybook,
inputStream match {
case stream: FSStream =>
outputStream.write(stream.getSkippedStartBytes)
case _ =>
}

val recordsProcessed = StreamProcessor.processStreamInPlace(copybook,
options,
inputStream,
recordExtractor,
rawRecordProcessor,
outputStream)

inputStream match {
case stream: FSStream =>
outputStream.write(stream.getSkippedEndBytes)
case _ =>
}
recordsProcessed
} finally {
inputStream.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon
private val fileSize: Long = new File(fileName).length()
private val effectiveSize: Long = math.max(0L, fileSize - fileStartOffset - fileEndOffset)
private var byteIndex = 0L

// Skip the start offset if specified
if (fileStartOffset > 0) {
skipFully(fileStartOffset)
}
private var skipped: Boolean = false

override def size: Long = effectiveSize

Expand All @@ -39,13 +35,83 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon

override def inputFileName: String = fileName

/**
* Reads and returns the bytes at the beginning of the file that precede the effective stream content.
*
* This method attempts to read the leading bytes (defined by the file start offset) that appear
* before the effective content of the stream. It only performs the read on the first invocation
* and if the file start offset is greater than zero. Subsequent calls will return an empty array
* since the skipped flag is set after the first successful read.
*
* @return an array of bytes containing the skipped start bytes, or an empty array if the bytes
* have already been read, the file start offset is zero or negative, or no bytes could be read.
*/
def getSkippedStartBytes: Array[Byte] = {
if (fileStartOffset > Int.MaxValue)
throw new IllegalArgumentException(s"fileStartOffset ($fileStartOffset) exceeds maximum supported value (${Int.MaxValue})")
val fileStartOffsetInt = fileStartOffset.toInt
if (skipped || fileStartOffsetInt <= 0)
Array.empty[Byte]
else {
skipped = true
val b = new Array[Byte](fileStartOffsetInt)
val actual = bytesStream.read(b, 0, fileStartOffsetInt)
if (actual <= 0) {
Array.empty[Byte]
} else {
b.take(actual)
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
* Reads and returns the bytes at the end of the file that follow the effective stream content.
*
* This method attempts to read the trailing bytes (defined by the file end offset) that appear
* after the effective content of the stream. It only performs the read when the byte index has
* reached or exceeded the effective size and the stream has not yet been closed. If the file end
* offset is zero or negative, the stream is closed and an empty array is returned.
*
* @return an array of bytes containing the skipped end bytes, or an empty array if the stream
* has not yet reached the end of the effective content, the stream is already closed,
* the file end offset is zero or negative, or no bytes could be read.
*/
def getSkippedEndBytes: Array[Byte] = {
if (fileEndOffset > Int.MaxValue)
throw new IllegalArgumentException(s"fileEndOffset ($fileEndOffset) exceeds maximum supported value (${Int.MaxValue})")
val fileEndOffsetInt = fileEndOffset.toInt
if (byteIndex >= effectiveSize && !isClosed) {
if (fileEndOffsetInt > 0) {
val b = new Array[Byte](fileEndOffsetInt)
val actual = bytesStream.read(b, 0, fileEndOffsetInt)
if (actual <= 0) {
Array.empty[Byte]
} else {
b.take(actual)
}
} else {
close()
Array.empty[Byte]
}
} else {
Array.empty[Byte]
}
}


@throws(classOf[IOException])
override def next(numberOfBytes: Int): Array[Byte] = {
if (numberOfBytes <= 0) throw new IllegalArgumentException("Value of numberOfBytes should be greater than zero.")

if (!skipped && fileStartOffset > 0) {
// Skip the start offset if specified
skipFully(fileStartOffset)
}

// Check if we've reached the effective end of the stream
if (byteIndex >= effectiveSize) {
close()
if (fileEndOffset <= 0)
close()
return new Array[Byte](0)
}

Expand Down Expand Up @@ -77,6 +143,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon
}

private def skipFully(bytesToSkip: Long): Unit = {
skipped = true
var remaining = math.min(bytesToSkip, fileSize)
while (remaining > 0) {
val skipped = bytesStream.skip(remaining)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class CobolProcessorBuilderSuite extends AnyWordSpec with BinaryFileFixture {

assert(count == 4)
assert(outputData.sameElements(
Array(-16,-15,-14,-13).map(_.toByte)
Array(7, 7, 7, -16,-15,-14,-13, 8, 8).map(_.toByte)
))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,13 @@ object SparkCobolProcessor {

val readerParameters = cobolProcessorBuilder.getReaderParameters
val cobolProcessor = cobolProcessorBuilder.build()
val retainStartAndEndOffsets = cobolProcessingStrategy == CobolProcessingStrategy.InPlace

val processor = new SparkCobolProcessor {
private val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)

override def process(listOfFiles: Seq[String], outputPath: String): Long = {
getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, readerParameters, rawRecordProcessorOpt.get, sconf, numberOfThreads)
getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, readerParameters, rawRecordProcessorOpt.get, sconf, retainStartAndEndOffsets, numberOfThreads)
.reduce(_ + _)
}
}
Expand Down Expand Up @@ -190,12 +191,13 @@ object SparkCobolProcessor {
readerParameters: ReaderParameters,
rawRecordProcessor: SerializableRawRecordProcessor,
sconf: SerializableConfiguration,
retainStartAndEndOffsets: Boolean,
numberOfThreads: Int
)(implicit spark: SparkSession): RDD[Long] = {
val groupedFiles = listOfFiles.grouped(numberOfThreads).toSeq
val rdd = spark.sparkContext.parallelize(groupedFiles)
rdd.map(group => {
processListOfFiles(group, outputPath, cobolProcessor, readerParameters, rawRecordProcessor, sconf, numberOfThreads)
processListOfFiles(group, outputPath, cobolProcessor, readerParameters, rawRecordProcessor, sconf, retainStartAndEndOffsets, numberOfThreads)
})
}

Expand Down Expand Up @@ -254,6 +256,7 @@ object SparkCobolProcessor {
readerParameters: ReaderParameters,
rawRecordProcessor: SerializableRawRecordProcessor,
sconf: SerializableConfiguration,
retainStartAndEndOffsets: Boolean,
numberOfThreads: Int
): Long = {
val threadPool: ExecutorService = Executors.newFixedThreadPool(numberOfThreads)
Expand Down Expand Up @@ -281,7 +284,18 @@ object SparkCobolProcessor {

val recordCount = UsingUtils.using(new FileStreamer(inputFile, sconf.value, fileStartOffset, maximumBytes)) { ifs =>
UsingUtils.using(new BufferedOutputStream(outputFs.create(outputFile, true))) { ofs =>
cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
if (fileStartOffset > 0 && retainStartAndEndOffsets) {
val tempStream = new FileStreamer(inputFile, sconf.value)
ofs.write(tempStream.next(fileStartOffset))
tempStream.close()
}
val recordsProcessed = cobolProcessor.process(ifs, ofs)(rawRecordProcessor)
if (fileEndOffset > 0 && retainStartAndEndOffsets) {
val tempStream = new FileStreamer(inputFile, sconf.value, maximumBytes + fileStartOffset)
ofs.write(tempStream.next(fileEndOffset))
tempStream.close()
}
recordsProcessed
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import java.io.IOException
* @note This class is not thread-safe and should only be accessed from a single thread
*/
class FileStreamer(filePath: String, hadoopConfig: Configuration, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream {

private val logger: Logger = LoggerFactory.getLogger(this.getClass)

private val hadoopPath = new Path(filePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,15 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
val outputData = readBinaryFile(outputFile)

assert(outputData.sameElements(
Array(-16, -15, -14, -13).map(_.toByte)
Array(7, 7, 7, -16, -15, -14, -13, 8, 8).map(_.toByte)
))

val actual = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("file_start_offset", "3")
.option("file_end_offset", "2")
.option("pedantic", "true")
.load(outputFile)
.toJSON
Expand Down
Loading