diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala index 59867282..24ae4955 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala @@ -21,7 +21,7 @@ import za.co.absa.cobrix.cobol.processor.{CobolProcessor, RawRecordProcessor} import za.co.absa.cobrix.cobol.reader.VarLenNestedReader import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters -import za.co.absa.cobrix.cobol.reader.stream.SimpleStream +import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream} import java.io.OutputStream @@ -47,12 +47,25 @@ class CobolProcessorInPlace(readerParameters: ReaderParameters, val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream, None) try { - StreamProcessor.processStreamInPlace(copybook, + inputStream match { + case stream: FSStream => + outputStream.write(stream.getSkippedStartBytes) + case _ => + } + + val recordsProcessed = StreamProcessor.processStreamInPlace(copybook, options, inputStream, recordExtractor, rawRecordProcessor, outputStream) + + inputStream match { + case stream: FSStream => + outputStream.write(stream.getSkippedEndBytes) + case _ => + } + recordsProcessed } finally { inputStream.close() } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala index 542fffde..b936fa48 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala @@ -25,11 +25,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon private val fileSize: Long = new File(fileName).length() private val effectiveSize: Long = math.max(0L, fileSize - fileStartOffset - fileEndOffset) private var byteIndex = 0L - - // Skip the start offset if specified - if (fileStartOffset > 0) { - skipFully(fileStartOffset) - } + private var skipped: Boolean = false override def size: Long = effectiveSize @@ -39,13 +35,83 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon override def inputFileName: String = fileName + /** + * Reads and returns the bytes at the beginning of the file that precede the effective stream content. + * + * This method attempts to read the leading bytes (defined by the file start offset) that appear + * before the effective content of the stream. It only performs the read on the first invocation + * and if the file start offset is greater than zero. Subsequent calls will return an empty array + * since the skipped flag is set after the first successful read. + * + * @return an array of bytes containing the skipped start bytes, or an empty array if the bytes + * have already been read, the file start offset is zero or negative, or no bytes could be read. + */ + def getSkippedStartBytes: Array[Byte] = { + if (fileStartOffset > Int.MaxValue) + throw new IllegalArgumentException(s"fileStartOffset ($fileStartOffset) exceeds maximum supported value (${Int.MaxValue})") + val fileStartOffsetInt = fileStartOffset.toInt + if (skipped || fileStartOffsetInt <= 0) + Array.empty[Byte] + else { + skipped = true + val b = new Array[Byte](fileStartOffsetInt) + val actual = bytesStream.read(b, 0, fileStartOffsetInt) + if (actual <= 0) { + Array.empty[Byte] + } else { + b.take(actual) + } + } + } + + /** + * Reads and returns the bytes at the end of the file that follow the effective stream content. + * + * This method attempts to read the trailing bytes (defined by the file end offset) that appear + * after the effective content of the stream. It only performs the read when the byte index has + * reached or exceeded the effective size and the stream has not yet been closed. If the file end + * offset is zero or negative, the stream is closed and an empty array is returned. + * + * @return an array of bytes containing the skipped end bytes, or an empty array if the stream + * has not yet reached the end of the effective content, the stream is already closed, + * the file end offset is zero or negative, or no bytes could be read. + */ + def getSkippedEndBytes: Array[Byte] = { + if (fileEndOffset > Int.MaxValue) + throw new IllegalArgumentException(s"fileEndOffset ($fileEndOffset) exceeds maximum supported value (${Int.MaxValue})") + val fileEndOffsetInt = fileEndOffset.toInt + if (byteIndex >= effectiveSize && !isClosed) { + if (fileEndOffsetInt > 0) { + val b = new Array[Byte](fileEndOffsetInt) + val actual = bytesStream.read(b, 0, fileEndOffsetInt) + if (actual <= 0) { + Array.empty[Byte] + } else { + b.take(actual) + } + } else { + close() + Array.empty[Byte] + } + } else { + Array.empty[Byte] + } + } + + @throws(classOf[IOException]) override def next(numberOfBytes: Int): Array[Byte] = { if (numberOfBytes <= 0) throw new IllegalArgumentException("Value of numberOfBytes should be greater than zero.") + if (!skipped && fileStartOffset > 0) { + // Skip the start offset if specified + skipFully(fileStartOffset) + } + // Check if we've reached the effective end of the stream if (byteIndex >= effectiveSize) { - close() + if (fileEndOffset <= 0) + close() return new Array[Byte](0) } @@ -77,6 +143,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon } private def skipFully(bytesToSkip: Long): Unit = { + skipped = true var remaining = math.min(bytesToSkip, fileSize) while (remaining > 0) { val skipped = bytesStream.skip(remaining) diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala index 63cef3dd..c4233cf7 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala @@ -137,7 +137,7 @@ class CobolProcessorBuilderSuite extends AnyWordSpec with BinaryFileFixture { assert(count == 4) assert(outputData.sameElements( - Array(-16,-15,-14,-13).map(_.toByte) + Array(7, 7, 7, -16,-15,-14,-13, 8, 8).map(_.toByte) )) } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala index 96e327e2..9f2aed98 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala @@ -154,12 +154,13 @@ object SparkCobolProcessor { val readerParameters = cobolProcessorBuilder.getReaderParameters val cobolProcessor = cobolProcessorBuilder.build() + val retainStartAndEndOffsets = cobolProcessingStrategy == CobolProcessingStrategy.InPlace val processor = new SparkCobolProcessor { private val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration) override def process(listOfFiles: Seq[String], outputPath: String): Long = { - getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, readerParameters, rawRecordProcessorOpt.get, sconf, numberOfThreads) + getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, readerParameters, rawRecordProcessorOpt.get, sconf, retainStartAndEndOffsets, numberOfThreads) .reduce(_ + _) } } @@ -190,12 +191,13 @@ object SparkCobolProcessor { readerParameters: ReaderParameters, rawRecordProcessor: SerializableRawRecordProcessor, sconf: SerializableConfiguration, + retainStartAndEndOffsets: Boolean, numberOfThreads: Int )(implicit spark: SparkSession): RDD[Long] = { val groupedFiles = listOfFiles.grouped(numberOfThreads).toSeq val rdd = spark.sparkContext.parallelize(groupedFiles) rdd.map(group => { - processListOfFiles(group, outputPath, cobolProcessor, readerParameters, rawRecordProcessor, sconf, numberOfThreads) + processListOfFiles(group, outputPath, cobolProcessor, readerParameters, rawRecordProcessor, sconf, retainStartAndEndOffsets, numberOfThreads) }) } @@ -254,6 +256,7 @@ object SparkCobolProcessor { readerParameters: ReaderParameters, rawRecordProcessor: SerializableRawRecordProcessor, sconf: SerializableConfiguration, + retainStartAndEndOffsets: Boolean, numberOfThreads: Int ): Long = { val threadPool: ExecutorService = Executors.newFixedThreadPool(numberOfThreads) @@ -281,7 +284,18 @@ object SparkCobolProcessor { val recordCount = UsingUtils.using(new FileStreamer(inputFile, sconf.value, fileStartOffset, maximumBytes)) { ifs => UsingUtils.using(new BufferedOutputStream(outputFs.create(outputFile, true))) { ofs => - cobolProcessor.process(ifs, ofs)(rawRecordProcessor) + if (fileStartOffset > 0 && retainStartAndEndOffsets) { + val tempStream = new FileStreamer(inputFile, sconf.value) + ofs.write(tempStream.next(fileStartOffset)) + tempStream.close() + } + val recordsProcessed = cobolProcessor.process(ifs, ofs)(rawRecordProcessor) + if (fileEndOffset > 0 && retainStartAndEndOffsets) { + val tempStream = new FileStreamer(inputFile, sconf.value, maximumBytes + fileStartOffset) + ofs.write(tempStream.next(fileEndOffset)) + tempStream.close() + } + recordsProcessed } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala index 6f6e39a0..7caa2593 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala @@ -38,7 +38,6 @@ import java.io.IOException * @note This class is not thread-safe and should only be accessed from a single thread */ class FileStreamer(filePath: String, hadoopConfig: Configuration, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream { - private val logger: Logger = LoggerFactory.getLogger(this.getClass) private val hadoopPath = new Path(filePath) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala index 44d2d2ac..9d48345e 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala @@ -176,13 +176,15 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar val outputData = readBinaryFile(outputFile) assert(outputData.sameElements( - Array(-16, -15, -14, -13).map(_.toByte) + Array(7, 7, 7, -16, -15, -14, -13, 8, 8).map(_.toByte) )) val actual = spark.read .format("cobol") .option("copybook_contents", copybook) .option("record_format", "F") + .option("file_start_offset", "3") + .option("file_end_offset", "2") .option("pedantic", "true") .load(outputFile) .toJSON