From b7ab55faee89149d165dcec8b5d4c69130ff766c Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 7 May 2026 07:39:47 +0200 Subject: [PATCH 1/7] #738 Add a new journaling table for storing executions on pipeline level instead of task level. --- .../pramen/api/status/PipelineStatus.scala | 16 +++-- .../pramen/core/bookkeeper/Bookkeeper.scala | 12 ++-- .../co/absa/pramen/core/journal/Journal.scala | 4 +- .../pramen/core/journal/JournalDynamoDB.scala | 6 +- .../core/journal/JournalHadoopCsv.scala | 7 +- .../core/journal/JournalHadoopDeltaPath.scala | 19 ++++- .../journal/JournalHadoopDeltaTable.scala | 29 ++++++-- .../pramen/core/journal/JournalJdbc.scala | 17 ++++- .../pramen/core/journal/JournalMongoDb.scala | 38 ++++++---- .../pramen/core/journal/JournalNull.scala | 5 +- .../pramen/core/journal/model/Execution.scala | 41 +++++++++++ .../core/journal/model/ExecutionsTable.scala | 61 ++++++++++++++++ .../za/co/absa/pramen/core/rdb/PramenDb.scala | 11 ++- .../absa/pramen/core/runner/AppRunner.scala | 1 + .../pramen/core/state/PipelineState.scala | 3 + .../pramen/core/state/PipelineStateImpl.scala | 41 ++++++++++- .../absa/pramen/core/GlueIcebergSuite.scala | 72 +++++++++++++++++++ .../core/mocks/journal/JournalMock.scala | 5 +- .../core/mocks/state/PipelineStateSpy.scala | 6 ++ .../JournalHadoopDeltaPathLongSuite.scala | 2 +- .../tests/journal/JournalMongoDbSuite.scala | 8 +-- .../tests/utils/hive/HiveHelperSqlSuite.scala | 2 +- 22 files changed, 361 insertions(+), 45 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/PipelineStatus.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/PipelineStatus.scala index a094b04f1..13afdf967 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/PipelineStatus.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/PipelineStatus.scala @@ -19,8 +19,16 @@ package za.co.absa.pramen.api.status sealed trait PipelineStatus object PipelineStatus { - case object Success extends PipelineStatus - case object Warning extends PipelineStatus - case object PartialSuccess extends PipelineStatus - case object Failure extends PipelineStatus + case object Success extends PipelineStatus { + override def toString: String = "succeeded" + } + case object Warning extends PipelineStatus{ + override def toString: String = "succeeded with warnings" + } + case object PartialSuccess extends PipelineStatus{ + override def toString: String = "partially succeeded" + } + case object Failure extends PipelineStatus{ + override def toString: String = "failed" + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala index 8f0602754..c92e76932 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala @@ -201,13 +201,15 @@ object Bookkeeper { case HadoopFormat.Delta => bookkeepingConfig.deltaTablePrefix match { case Some(tablePrefix) => - val fullTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix) - log.info(s"Using Delta Lake managed table '$fullTableName' for the journal.") + val journalTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix, "journal") + val executionsTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix, "executions") + log.info(s"Using Delta Lake managed table '$journalTableName' and '$executionsTableName' for the journal.") new JournalHadoopDeltaTable(bookkeepingConfig.deltaDatabase, tablePrefix) case None => - val path = bookkeepingConfig.bookkeepingLocation.get + "/journal" - log.info(s"Using Delta Lake for the journal at $path") - new JournalHadoopDeltaPath(path) + val journalPath = bookkeepingConfig.bookkeepingLocation.get + "/journal" + val executionsPath = bookkeepingConfig.bookkeepingLocation.get + "/executions" + log.info(s"Using Delta Lake for the journal at '$journalPath' and '$executionsPath'") + new JournalHadoopDeltaPath(journalPath, executionsPath) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scala index b7d70ff81..6605a4778 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core.journal -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import java.time.Instant @@ -27,6 +27,8 @@ trait Journal extends AutoCloseable { def addEntry(entry: TaskCompleted): Unit + def addPipelineEntry(execution: Execution): Unit + def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] override def close(): Unit = {} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala index a9453d3ed..df27c07f7 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala @@ -24,7 +24,7 @@ import software.amazon.awssdk.services.dynamodb.model._ import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.bookkeeper.BookkeeperDynamoDb import za.co.absa.pramen.core.bookkeeper.BookkeeperDynamoDb.waitForTableActive -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import java.net.URI import java.time.{Instant, LocalDate} @@ -119,6 +119,10 @@ class JournalDynamoDB private ( } } + override def addPipelineEntry(execution: Execution): Unit = { + // ToDo add the implementation for DynamoDB + } + /** * Get journal entries within a time range. */ diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala index a27b7a0e5..4d68b3458 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import za.co.absa.pramen.core.app.config.InfoDateConfig -import za.co.absa.pramen.core.journal.model.{TaskCompleted, TaskCompletedCsv} +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted, TaskCompletedCsv} import za.co.absa.pramen.core.utils.{CsvUtils, FsUtils, SparkUtils} import java.time.{Instant, LocalDate} @@ -93,6 +93,11 @@ class JournalHadoopCsv(journalPath: String) )) } + override def addPipelineEntry(execution: Execution): Unit = { + // ToDo add the implementation for CSV + } + + private def serializeCompletedTaskCsv(t: TaskCompleted): String = { val periodBegin = t.periodBegin.format(dateFormatter) val periodEnd = t.periodEnd.format(dateFormatter) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala index 6493029a8..05aefa69a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala @@ -20,12 +20,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions._ import org.apache.spark.sql.{SaveMode, SparkSession} -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import za.co.absa.pramen.core.utils.FsUtils import java.time.Instant -class JournalHadoopDeltaPath(journalPath: String) +class JournalHadoopDeltaPath(journalPath: String, executionsPath: String) (implicit spark: SparkSession) extends Journal { import spark.implicits._ @@ -47,6 +47,21 @@ class JournalHadoopDeltaPath(journalPath: String) .save(journalPath) } + override def addPipelineEntry(execution: Execution): Unit = { + val recordDf = Seq(execution).toDS().toDF() + + if (spark.version.split('.').head.toInt < 3) { + throw new IllegalArgumentException("Delta Lake for bookkeeping is only available in Spark 3+") + } + + recordDf + .write + .mode(SaveMode.Append) + .option("format", "delta") + .option("mergeSchema", "true") + .save(executionsPath) + } + override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = { import spark.implicits._ diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala index 18e096671..33d5c18da 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.journal import org.apache.spark.sql.functions._ import org.apache.spark.sql.{SaveMode, SparkSession} -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import java.time.Instant @@ -40,18 +40,33 @@ class JournalHadoopDeltaTable(database: Option[String], .mode(SaveMode.Append) .option("format", "delta") .option("mergeSchema", "true") - .saveAsTable(getFullTableName(database, tablePrefix)) + .saveAsTable(getFullTableName(database, tablePrefix, "journal")) + } + + override def addPipelineEntry(execution: Execution): Unit = { + val recordDf = Seq(execution).toDS().toDF() + + if (spark.version.split('.').head.toInt < 3) { + throw new IllegalArgumentException("Delta Lake for bookkeeping is only available in Spark 3+") + } + + recordDf + .write + .mode(SaveMode.Append) + .option("format", "delta") + .option("mergeSchema", "true") + .saveAsTable(getFullTableName(database, tablePrefix, "executions")) } override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = { import spark.implicits._ - if (!spark.catalog.tableExists(getFullTableName(database, tablePrefix))) { + if (!spark.catalog.tableExists(getFullTableName(database, tablePrefix, "journal"))) { return Seq.empty[TaskCompleted] } val df = spark - .table(getFullTableName(database, tablePrefix)) + .table(getFullTableName(database, tablePrefix, "journal")) df.filter(col("finishedAt") >= from.getEpochSecond && col("finishedAt") <= to.getEpochSecond) .orderBy(col("finishedAt")) @@ -61,10 +76,10 @@ class JournalHadoopDeltaTable(database: Option[String], } object JournalHadoopDeltaTable { - def getFullTableName(databaseOpt: Option[String], tablePrefix: String): String = { + def getFullTableName(databaseOpt: Option[String], tablePrefix: String, tableName: String): String = { databaseOpt match { - case Some(db) => s"$db.${tablePrefix}journal" - case None => s"${tablePrefix}journal" + case Some(db) => s"$db.$tablePrefix$tableName" + case None => s"$tablePrefix$tableName" } } } \ No newline at end of file diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala index 0558af397..d9a5afa6e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala @@ -20,7 +20,7 @@ import org.slf4j.LoggerFactory import slick.jdbc.JdbcBackend.Database import slick.jdbc.JdbcProfile import za.co.absa.pramen.core.app.config.InfoDateConfig -import za.co.absa.pramen.core.journal.model.{JournalTable, JournalTask, TaskCompleted} +import za.co.absa.pramen.core.journal.model._ import za.co.absa.pramen.core.utils.SlickUtils import java.time.{Instant, LocalDate} @@ -38,6 +38,10 @@ class JournalJdbc(db: Database, slickProfile: JdbcProfile) extends Journal { override val profile = slickProfile } + private val executionsTable = new ExecutionsTable { + override val profile = slickProfile + } + override def addEntry(entry: TaskCompleted): Unit = { val periodBegin = entry.periodBegin.format(dateFormatter) val periodEnd = entry.periodEnd.format(dateFormatter) @@ -76,6 +80,17 @@ class JournalJdbc(db: Database, slickProfile: JdbcProfile) extends Journal { } } + override def addPipelineEntry(execution: Execution): Unit = { + try { + slickUtils.ensureDbConnected(db) + db.run( + executionsTable.records += execution + ).execute() + } catch { + case NonFatal(ex) => log.error(s"Unable to write to the executions table.", ex) + } + } + override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = { val fromSec = from.getEpochSecond val toSec = to.getEpochSecond diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalMongoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalMongoDb.scala index e1e1c7c0c..4ecacbaa4 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalMongoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalMongoDb.scala @@ -16,8 +16,6 @@ package za.co.absa.pramen.core.journal -import java.time.Instant - import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries} import org.bson.codecs.configuration.CodecRegistry import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY @@ -25,13 +23,15 @@ import org.mongodb.scala.bson.codecs.Macros._ import org.mongodb.scala.model.Filters import za.co.absa.pramen.core.dao.MongoDb import za.co.absa.pramen.core.dao.model.{ASC, IndexField} -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import za.co.absa.pramen.core.mongo.MongoDbConnection +import java.time.Instant import scala.util.control.NonFatal object JournalMongoDb { - val collectionName = "journal" + val tasksCollectionName = "journal" + val executionsCollectionName = "executions" } class JournalMongoDb(mongoDbConnection: MongoDbConnection) extends Journal { @@ -39,15 +39,20 @@ class JournalMongoDb(mongoDbConnection: MongoDbConnection) extends Journal { import JournalMongoDb._ import za.co.absa.pramen.core.dao.ScalaMongoImplicits._ - private val codecRegistry: CodecRegistry = fromRegistries(fromProviders(classOf[TaskCompleted]), DEFAULT_CODEC_REGISTRY) + private val codecRegistry: CodecRegistry = fromRegistries(fromProviders(classOf[TaskCompleted], classOf[Execution]), DEFAULT_CODEC_REGISTRY) private val db = mongoDbConnection.getDatabase initCollection() - private val collection = db.getCollection[TaskCompleted](collectionName).withCodecRegistry(codecRegistry) + private val tasksCollection = db.getCollection[TaskCompleted](tasksCollectionName).withCodecRegistry(codecRegistry) + private val executionsCollection = db.getCollection[Execution](executionsCollectionName).withCodecRegistry(codecRegistry) override def addEntry(entry: TaskCompleted): Unit = { - collection.insertOne(entry).execute() + tasksCollection.insertOne(entry).execute() + } + + override def addPipelineEntry(execution: Execution): Unit = { + executionsCollection.insertOne(execution).execute() } override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = { @@ -59,17 +64,26 @@ class JournalMongoDb(mongoDbConnection: MongoDbConnection) extends Journal { Filters.gte("finishedAt", instant0.getEpochSecond), Filters.lte("finishedAt", instant1.getEpochSecond) ) - collection.find(filter).execute() + tasksCollection.find(filter).execute() } private def initCollection(): Unit = { try { val d = new MongoDb(db) - if (!d.doesCollectionExists(collectionName)) { - d.createCollection(collectionName) - d.createIndex(collectionName, IndexField("startedAt", ASC) :: Nil) - d.createIndex(collectionName, IndexField("finishedAt", ASC) :: Nil) + if (!d.doesCollectionExists(tasksCollectionName)) { + d.createCollection(tasksCollectionName) + d.createIndex(tasksCollectionName, IndexField("startedAt", ASC) :: Nil) + d.createIndex(tasksCollectionName, IndexField("finishedAt", ASC) :: Nil) + } + if (!d.doesCollectionExists(executionsCollectionName)) { + d.createCollection(executionsCollectionName) + d.createIndex(executionsCollectionName, IndexField("startedAt", ASC) :: Nil) + d.createIndex(executionsCollectionName, IndexField("finishedAt", ASC) :: Nil) + d.createIndex(executionsCollectionName, IndexField("batchId", ASC) :: Nil) + d.createIndex(executionsCollectionName, IndexField("computeEngineId", ASC) :: Nil) + d.createIndex(executionsCollectionName, IndexField("pipelineId", ASC) :: Nil) } + } catch { case NonFatal(ex) => throw new RuntimeException(s"Unable to connect to MongoDb instance: ${mongoDbConnection.getConnectionString}", ex) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalNull.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalNull.scala index b02bea6b1..403671aef 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalNull.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalNull.scala @@ -16,7 +16,8 @@ package za.co.absa.pramen.core.journal -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} + import java.time.Instant /** @@ -26,5 +27,7 @@ import java.time.Instant class JournalNull extends Journal { override def addEntry(entry: TaskCompleted): Unit = {} + override def addPipelineEntry(pipelineStatus: Execution): Unit = {} + override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = Nil } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala new file mode 100644 index 000000000..fbe4d7c60 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.journal.model + +case class Execution( + pipelineId: String, + pipelineName: String, + environmentName: String, + batchId: Long, + sparkApplicationId: String, + computeEngineId: Option[String], + tenant: Option[String], + country: Option[String], + runDateFrom: String, + runDateTo: Option[String], + startedAt: Long, + finishedAt: Long, + numberOfExecutorsMin: Option[Int], + numberOfExecutorsMax: Option[Int], + executorType: Option[String], + status: String, + isRerun: Boolean, + attemptNumber: Int, + numberOfAttempts: Int, + failureReason: Option[String], + additionalOptions: Option[String] + ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala new file mode 100644 index 000000000..41746a0c5 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.journal.model + +import slick.jdbc.JdbcProfile + +trait ExecutionsTable { + val profile: JdbcProfile + import profile.api._ + + class ExecutionsRecords(tag: Tag) extends Table[Execution](tag, "executions") { + def pipelineId = column[String]("pipelineId", O.Length(40)) + def pipelineName = column[String]("pipeline_name", O.Length(200)) + def environmentName = column[String]("environment_name", O.Length(128)) + def batchId = column[Long]("batch_id") + def sparkApplicationId = column[String]("spark_application_id", O.Length(128)) + def computeEngineId = column[Option[String]]("compute_engine_id", O.Length(128)) + def tenant = column[Option[String]]("tenant", O.Length(200)) + def country = column[Option[String]]("country", O.Length(50)) + def runDateFrom = column[String]("run_date_from", O.Length(20)) + def runDateTo = column[Option[String]]("run_date_to", O.Length(20)) + def startedAt = column[Long]("started_at") + def finishedAt = column[Long]("finished_at") + def numberOfExecutorsMin = column[Option[Int]]("number_of_executors_min") + def numberOfExecutorsMax = column[Option[Int]]("number_of_executors_max") + def executorType = column[Option[String]]("executor_type", O.Length(128)) + def status = column[String]("status", O.Length(50)) + def isRerun = column[Boolean]("is_rerun") + def attemptNumber = column[Int]("attempt_number") + def numberOfAttempts = column[Int]("number_of_attempts") + def failureReason = column[Option[String]]("failure_reason") + def additionalOptions = column[Option[String]]("additional_options") + + def * = (pipelineId, pipelineName, environmentName, batchId, sparkApplicationId, + computeEngineId, tenant, country, runDateFrom, runDateTo, startedAt, finishedAt, numberOfExecutorsMin, + numberOfExecutorsMax, executorType, status, isRerun, attemptNumber, numberOfAttempts, failureReason, + additionalOptions) <> (Execution.tupled, Execution.unapply) + + def idx1 = index("idx_exec_started_at", startedAt, unique = false) + def idx2 = index("idx_exec_finished_at", finishedAt, unique = false) + def idx3 = index("idx_exec_batchid", finishedAt, unique = false) + def idx4 = index("idx_exec_compute_engine_id", finishedAt, unique = false) + def idx5 = index("idx_exec_pipeline_id", finishedAt, unique = false) + } + + lazy val records = TableQuery[ExecutionsRecords] +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index d00f24ec1..056ac3f1c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -22,7 +22,7 @@ import slick.jdbc.{JdbcBackend, JdbcProfile} import slick.util.AsyncExecutor import za.co.absa.pramen.api.Pramen import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingTable, MetadataTable, OffsetTable, SchemaTable} -import za.co.absa.pramen.core.journal.model.JournalTable +import za.co.absa.pramen.core.journal.model.{ExecutionsTable, JournalTable} import za.co.absa.pramen.core.lock.model.LockTicketTable import za.co.absa.pramen.core.rdb.PramenDb.MODEL_VERSION import za.co.absa.pramen.core.reader.JdbcUrlSelector @@ -53,6 +53,9 @@ class PramenDb(val jdbcConfig: JdbcConfig, val journalTable: JournalTable = new JournalTable { override val profile = slickProfile } + val executionsTable: ExecutionsTable = new ExecutionsTable { + override val profile = slickProfile + } val lockTicketTable: LockTicketTable = new LockTicketTable { override val profile = slickProfile } @@ -121,6 +124,10 @@ class PramenDb(val jdbcConfig: JdbcConfig, addColumn(bookkeepingTable.records.baseTableRow.tableName, "appended_record_count", "bigint") addColumn(journalTable.records.baseTableRow.tableName, "batch_id", "bigint") } + + if (dbVersion < 10) { + initTable(executionsTable.records.schema) + } } private def initTable(schema: slickProfile.SchemaDescription): Unit = { @@ -165,7 +172,7 @@ object PramenDb { private val log = LoggerFactory.getLogger(this.getClass) private val conf = Pramen.getConfig - val MODEL_VERSION = 9 + val MODEL_VERSION = 10 val DEFAULT_RETRIES: Int = conf.getInt("pramen.internal.connection.retries.default") val BACKOFF_MIN_MS: Int = conf.getInt("pramen.internal.connection.backoff.min.ms") val BACKOFF_MAX_MS: Int = conf.getInt("pramen.internal.connection.backoff.max.ms") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala index fcfeabbf5..0c6ca2872 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala @@ -70,6 +70,7 @@ object AppRunner { _ <- logBanner(spark) _ <- logExecutorNodes(conf, state, spark) appContext <- createAppContext(conf, state, spark) + _ <- Try { state.setJournal(appContext.journal) } taskRunner <- createTaskRunner(conf, state, appContext, spark.sparkContext.applicationId) pipeline <- getPipelineDef(conf, state, appContext) _ <- addSinkTables(state, pipeline, appContext) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala index cd61e3efe..a7fdb6a64 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala @@ -17,6 +17,7 @@ package za.co.absa.pramen.core.state import za.co.absa.pramen.api.status.{PipelineStateSnapshot, TaskResult} +import za.co.absa.pramen.core.journal.Journal trait PipelineState extends AutoCloseable { def getState: PipelineStateSnapshot @@ -33,6 +34,8 @@ trait PipelineState extends AutoCloseable { def setSparkAppId(sparkAppId: String): Unit + def setJournal(journal: Journal): Unit + def addTaskCompletion(statuses: Seq[TaskResult]): Unit def getExitCode: Int diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index e4c671b3c..afa7f39e7 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -21,11 +21,13 @@ import org.slf4j.{Logger, LoggerFactory} import sun.misc.Signal import za.co.absa.pramen.api.status.RunStatus.{NotRan, Succeeded} import za.co.absa.pramen.api.status._ -import za.co.absa.pramen.api.{NotificationBuilder, PipelineInfo, PipelineNotificationTarget} +import za.co.absa.pramen.api.{NotificationBuilder, PipelineInfo, PipelineNotificationTarget, RunMode} import za.co.absa.pramen.core.app.config.RuntimeConfig.{DRY_RUN, EMAIL_IF_NO_CHANGES, UNDERCOVER} import za.co.absa.pramen.core.app.config.{HookConfig, RuntimeConfig} import za.co.absa.pramen.core.config.Keys.{GOOD_THROUGHPUT_RPS, WARN_THROUGHPUT_RPS} import za.co.absa.pramen.core.exceptions.OsSignalException +import za.co.absa.pramen.core.journal.Journal +import za.co.absa.pramen.core.journal.model.Execution import za.co.absa.pramen.core.lock.TokenLockRegistry import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager} import za.co.absa.pramen.core.notify.PipelineNotificationTargetFactory @@ -70,6 +72,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification @volatile private var customShutdownHookCanRun = false @volatile private var sparkAppId: Option[String] = None @volatile private var warningFlag: Boolean = false + @volatile private var journalOpt: Option[Journal] = None init() @@ -178,6 +181,10 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification this.sparkAppId = Option(sparkAppId) } + override def setJournal(journal: Journal): Unit = synchronized { + this.journalOpt = Option(journal) + } + override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized { taskResults ++= statuses.filter(_.runStatus != NotRan) if (statuses.exists(_.runStatus.isFailure)) { @@ -209,6 +216,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification sendPipelineNotifications() runCustomShutdownHook() removeSignalHandlers() + addJournalEntry() sendNotificationEmail() TokenLockRegistry.releaseAllLocks() } @@ -277,6 +285,37 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification } } + protected def addJournalEntry(): Unit = { + val pipelineInfo = getPipelineInfo + journalOpt.foreach { journal => + val execution = Execution( + pipelineId, + pipelineName, + environmentName, + batchId, + sparkAppId.getOrElse(""), + None, + tenant, + country, + runtimeConfig.runDate.toString, + runtimeConfig.runDateTo.map(_.toString), + startedInstant.getEpochSecond, + finishedInstant.getOrElse(Instant.now()).getEpochSecond, + None, + None, + None, + pipelineInfo.status.toString, + runtimeConfig.isRerun || runtimeConfig.historicalRunMode == RunMode.ForceRun, + 1, + 1, + pipelineInfo.failureException.map(_.getMessage.take(1000)), + None + ) + + journal.addPipelineEntry(execution) + } + } + protected def sendNotificationEmail(): Unit = { failureException.foreach(ex => log.error(s"The job has FAILED.", ex)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala new file mode 100644 index 000000000..2aada79c3 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.script + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.slf4j.LoggerFactory + +import java.io.ByteArrayOutputStream + +/** + * This is the Glue job to use for running Pramen on Glue. + * + * See documentation of AqueductPramenJob in 'pramen-components' module for the info on usage. + */ + +object MyApp { + private val log = LoggerFactory.getLogger(this.getClass) + + def showString(df: DataFrame, numRows: Int = 20): String = { + val outCapture = new ByteArrayOutputStream + Console.withOut(outCapture) { + df.show(numRows, truncate = false) + } + new String(outCapture.toByteArray).replace("\r\n", "\n") + } + + // spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions + // --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog + // --conf spark.sql.catalog.glue_catalog.warehouse=s3://aqueduct-pipelines-dev-application-data/iceberg-warehouse + // --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog + // --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO + // --conf spark.sql.defaultCatalog=glue_catalog + def main(args: Array[String]): Unit = { + val spark: SparkSession = SparkSession.builder() + .appName("test") + .config("spark.sql.parquet.writeLegacyFormat", "true") + .config("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") + .config("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog.glue_catalog.warehouse", "s3://aqueduct-pipelines-uat-application-data/iceberg-warehouse") + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") + .config("spark.sql.catalog.glue_catalog.io", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.glue_catalog.s3.access-points.ursamajor-afs1-dev-edla-aqdt-za", "arn:aws:s3:af-south-1:131405913869:accesspoint/aqdt-za-npintdeaqueduct") + .config("spark.sql.defaultCatalog", "glue_catalog") + .getOrCreate() + + //log.error(showString(spark.catalog.listTables("edla_dev_aqdt_za_publish_rl").toDF())) + //log.error(showString(spark.table("glue_catalog.edla_dev_aqdt_za_publish_rl.aq_journal_iceberg4"))) + + //spark.sql( + // """ + // | ALTER TABLE glue_catalog.edla_dev_aqdt_za_publish_rl.aq_journal_iceberg4 CREATE TAG my_tag AS OF VERSION 7314587597447473054 + // |""".stripMargin).collect() + + val df1 = spark.sql("SELECT * FROM glue_catalog.edla_dev_aqdt_za_publish_rl.aq_journal_iceberg4.refs") + log.error(showString(df1)) + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/journal/JournalMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/journal/JournalMock.scala index 6a4fce5b8..8a45f2bba 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/journal/JournalMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/journal/JournalMock.scala @@ -17,16 +17,19 @@ package za.co.absa.pramen.core.mocks.journal import za.co.absa.pramen.core.journal.Journal -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import java.time.Instant import scala.collection.mutable.ListBuffer class JournalMock extends Journal { val entries: ListBuffer[TaskCompleted] = new ListBuffer[TaskCompleted] + val executions: ListBuffer[Execution] = new ListBuffer[Execution] override def addEntry(entry: TaskCompleted): Unit = entries += entry + override def addPipelineEntry(execution: Execution): Unit = executions += execution + override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = entries.filter(e => e.finishedAt >= from.getEpochSecond && e.finishedAt <= to.getEpochSecond).toSeq } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala index fd68398bd..73659f595 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala @@ -17,6 +17,7 @@ package za.co.absa.pramen.core.mocks.state import za.co.absa.pramen.api.status.{PipelineStateSnapshot, TaskResult} +import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.mocks.{PipelineInfoFactory, PipelineStateSnapshotFactory} import za.co.absa.pramen.core.state.PipelineState @@ -32,6 +33,7 @@ class PipelineStateSpy extends PipelineState { val completedStatuses = new ListBuffer[TaskResult] var closeCalled = 0 var sparkAppId: Option[String] = None + var journalOpt: Option[Journal] = None override def getState: PipelineStateSnapshot = { PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(PipelineInfoFactory.getDummyPipelineInfo(sparkApplicationId = sparkAppId), @@ -62,6 +64,10 @@ class PipelineStateSpy extends PipelineState { this.sparkAppId = Option(sparkAppId) } + override def setJournal(journal: Journal): Unit = synchronized { + this.journalOpt = Option(journal) + } + override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized { completedStatuses ++= statuses } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaPathLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaPathLongSuite.scala index dac7216e8..502e37b5c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaPathLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaPathLongSuite.scala @@ -72,7 +72,7 @@ class JournalHadoopDeltaPathLongSuite extends AnyWordSpec with SparkTestBase wit } private def getJournal(path: String): Journal = { - new JournalHadoopDeltaPath(new Path(path, "journal").toString) + new JournalHadoopDeltaPath(new Path(path, "journal").toString, new Path(path, "executions").toString) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala index d3e0825b1..9d63f85dd 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala @@ -30,8 +30,8 @@ class JournalMongoDbSuite extends AnyWordSpec with MongoDbFixture with BeforeAnd before { if (db != null) { - if (db.doesCollectionExists(collectionName)) { - db.dropCollection(collectionName) + if (db.doesCollectionExists(tasksCollectionName)) { + db.dropCollection(tasksCollectionName) } journal = new JournalMongoDb(connection) } @@ -42,9 +42,9 @@ class JournalMongoDbSuite extends AnyWordSpec with MongoDbFixture with BeforeAnd "Initialize an empty database" in { db.doesCollectionExists("collectionName") - assert(db.doesCollectionExists(collectionName)) + assert(db.doesCollectionExists(tasksCollectionName)) - val indexes = dbRaw.getCollection(collectionName).listIndexes().execute() + val indexes = dbRaw.getCollection(tasksCollectionName).listIndexes().execute() assert(indexes.size == 3) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala index a9e53d6b0..d7cbd873b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala @@ -126,7 +126,7 @@ class HiveHelperSqlSuite extends AnyWordSpec with SparkTestBase with TempDirFixt withTempDirectory("hive_test") { tempDir => val path = getParquetPath(tempDir) - val expected = "ALTER TABLE `db`.`tbl` REPLACE COLUMNS ( `c` INT );".stripMargin + val expected = "ALTER TABLE `db`.`tbl` REPLACE COLUMNS ( `c` INT )".stripMargin val qe = new QueryExecutorMock(tableExists = false) val hiveHelper = new HiveHelperSql(qe, defaultHiveConfig, true) From 5cb1bb95640272caa2d0f0326a8babc38a3be4cc Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 11 May 2026 10:44:43 +0200 Subject: [PATCH 2/7] #738 Add ability for pipelines to set runtime properties that go to the journal of executions. --- .../scala/za/co/absa/pramen/api/Pramen.scala | 10 ++++ .../absa/pramen/api/mocks/DummyPramen.scala | 10 ++++ .../za/co/absa/pramen/core/PramenImpl.scala | 10 ++++ .../core/app/config/RuntimeConfig.scala | 11 +++- .../absa/pramen/core/runner/AppRunner.scala | 51 ++++++++++++++++ .../pramen/core/state/PipelineState.scala | 10 ++++ .../pramen/core/state/PipelineStateImpl.scala | 58 ++++++++++++++++--- .../pramen/core/RuntimeConfigFactory.scala | 6 +- .../core/mocks/state/PipelineStateSpy.scala | 32 ++++++++++ 9 files changed, 186 insertions(+), 12 deletions(-) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala index 2460f3331..18d6b4ff0 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala @@ -75,6 +75,16 @@ trait Pramen { * @return An instance of the TokenLockFactory, which allows for token-based locking functionality. */ def tokenLockFactory: TokenLockFactory + + def setComputeEngineId(computeEngineId: String): Unit + + def setNumberOfExecutorsMin(n: Int): Unit + + def setNumberOfExecutorsMax(n: Int): Unit + + def setExecutorType(executorType: String): Unit + + def setExecutionAdditionalOption(key: String, value: String): Unit } object Pramen { diff --git a/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala b/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala index c09cc1c0d..d2d40d2ad 100644 --- a/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala +++ b/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala @@ -40,4 +40,14 @@ class DummyPramen extends Pramen { override def setWarningFlag(): Unit = null override def tokenLockFactory: TokenLockFactory = null + + override def setComputeEngineId(computeEngineId: String): Unit = {} + + override def setNumberOfExecutorsMin(n: Int): Unit = {} + + override def setNumberOfExecutorsMax(n: Int): Unit = {} + + override def setExecutorType(executorType: String): Unit = {} + + override def setExecutionAdditionalOption(key: String, value: String): Unit = {} } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala index f1e25e762..24383bcbb 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala @@ -86,6 +86,16 @@ class PramenImpl extends Pramen { throw new IllegalStateException("Token lock factory is not available at the context.") ) + override def setComputeEngineId(computeEngineId: String): Unit = _pipelineState.foreach(_.setComputeEngineId(computeEngineId)) + + override def setNumberOfExecutorsMin(n: Int): Unit = _pipelineState.foreach(_.setNumberOfExecutorsMin(n)) + + override def setNumberOfExecutorsMax(n: Int): Unit = _pipelineState.foreach(_.setNumberOfExecutorsMax(n)) + + override def setExecutorType(executorType: String): Unit = _pipelineState.foreach(_.setExecutorType(executorType)) + + override def setExecutionAdditionalOption(key: String, value: String): Unit = _pipelineState.foreach(_.setExecutionAdditionalOption(key, value)) + private[core] def setWorkflowConfig(config: Config): Unit = synchronized { _workflowConfig = Option(config) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala index 5463666ba..444708c2d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala @@ -48,7 +48,8 @@ case class RuntimeConfig( sparkAppDescriptionTemplate: Option[String], attempt: Int, // Current attempt number for the pipeline run (for auto-retry automation) maxAttempts: Int, // Maximum number of attempts allowed for the pipeline run - forceReCreateHiveTables: Boolean + forceReCreateHiveTables: Boolean, + executionOptions: Map[String, String] ) object RuntimeConfig { @@ -78,6 +79,7 @@ object RuntimeConfig { val ATTEMPT = "pramen.runtime.attempt" val MAX_ATTEMPTS = "pramen.runtime.max.attempts" val FORCE_RECREATE_HIVE_TABLES = "pramen.runtime.hive.force.recreate" + val EXECUTION_EXTRA_OPTIONS_PREFIX = "pramen.execution.option" def fromConfig(conf: Config): RuntimeConfig = { val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP) @@ -144,6 +146,7 @@ object RuntimeConfig { val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE) val attempt = ConfigUtils.getOptionInt(conf, ATTEMPT).getOrElse(1) val maxAttempts = ConfigUtils.getOptionInt(conf, MAX_ATTEMPTS).getOrElse(1) + val executionOptions = ConfigUtils.getExtraOptions(conf, EXECUTION_EXTRA_OPTIONS_PREFIX) RuntimeConfig( isDryRun = isDryRun, @@ -166,7 +169,8 @@ object RuntimeConfig { sparkAppDescriptionTemplate, attempt, maxAttempts, - forceReCreateHiveTables = ConfigUtils.getOptionBoolean(conf, FORCE_RECREATE_HIVE_TABLES).getOrElse(false) + forceReCreateHiveTables = ConfigUtils.getOptionBoolean(conf, FORCE_RECREATE_HIVE_TABLES).getOrElse(false), + executionOptions ) } @@ -192,7 +196,8 @@ object RuntimeConfig { sparkAppDescriptionTemplate = None, attempt = 1, maxAttempts = 1, - forceReCreateHiveTables = false + forceReCreateHiveTables = false, + Map.empty ) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala index 0c6ca2872..dd02cfa10 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala @@ -157,7 +157,12 @@ object AppRunner { val hosts = getExecutorNodes hosts.foreach(host => log.info(s"Executor node: $host")) + setMinMaxExecutors(spark, hosts, state) + } else { + setMinMaxExecutors(spark, Seq.empty, state) } + setExecutorNodeType(spark, state) + }, state, "Spark List of executor nodes") } @@ -169,6 +174,52 @@ object AppRunner { data.mapPartitions { _ => Iterable(java.net.InetAddress.getLocalHost.getHostName).iterator }.collect().distinct.sorted } + private[core] def setMinMaxExecutors(implicit spark: SparkSession, executorNodes: Seq[String], state: PipelineState): Unit = { + val executors = if (executorNodes.isEmpty) { + spark.sparkContext.getExecutorMemoryStatus.keySet + .filter(_ != "driver") + } else { + executorNodes + } + + val numExecutors = executors.size + state.setNumberOfExecutorsMax(numExecutors) + + val dynamicAllocEnabled = spark.conf.get("spark.dynamicAllocation.enabled", "false").toBoolean + + if (dynamicAllocEnabled) { + state.setNumberOfExecutorsMin(1) + } else { + state.setNumberOfExecutorsMin(numExecutors) + } + } + + private[core] def setExecutorNodeType(implicit spark: SparkSession, state: PipelineState): Unit = { + // Get first executor and construct a string like: + // C32M64 meaning 32 virtual CPUs and 64 GB of memory + try { + val executorMemoryStatus = spark.sparkContext.getExecutorMemoryStatus + val executorEntries = executorMemoryStatus.filterKeys(_ != "driver") + + if (executorEntries.nonEmpty) { + val (_, (maxMemory, _)) = executorEntries.head + val memoryGb = maxMemory / (1024L * 1024L * 1024L) + + val cores = spark.conf.get("spark.executor.cores", "0").toInt + val maxThreads = if (cores == 0) Runtime.getRuntime.availableProcessors() else cores + + val nodeType = s"C${maxThreads}M$memoryGb" + log.info(s"Executor node type: $nodeType") + state.setExecutorType(nodeType) + } else { + log.warn("No executors found to determine node type.") + } + } catch { + case ex: Exception => + log.warn(s"Unable to determine executor node type: ${ex.getMessage}") + } + } + private[core] def logBanner(implicit spark: SparkSession): Try[Unit] = { if (!bannerShown) { Try { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala index a7fdb6a64..d1bf87583 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala @@ -36,6 +36,16 @@ trait PipelineState extends AutoCloseable { def setJournal(journal: Journal): Unit + def setComputeEngineId(computeEngineId: String): Unit + + def setNumberOfExecutorsMin(n: Int): Unit + + def setNumberOfExecutorsMax(n: Int): Unit + + def setExecutorType(executorType: String): Unit + + def setExecutionAdditionalOption(key: String, value: String): Unit + def addTaskCompletion(statuses: Seq[TaskResult]): Unit def getExitCode: Int diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index afa7f39e7..e19bd929b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -16,6 +16,8 @@ package za.co.absa.pramen.core.state +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.typesafe.config.Config import org.slf4j.{Logger, LoggerFactory} import sun.misc.Signal @@ -36,6 +38,7 @@ import za.co.absa.pramen.core.pipeline.PipelineDef._ import za.co.absa.pramen.core.utils.{ConfigUtils, JvmUtils} import java.time.Instant +import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} @@ -65,6 +68,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification private val taskResults = new ListBuffer[TaskResult] private val pipelineNotificationFailures = new ListBuffer[PipelineNotificationFailure] private val signalHandlers = new ListBuffer[PramenSignalHandler] + private val executionAdditionalOptions: mutable.Map[String, String] = new mutable.HashMap[String, String] ++ runtimeConfig.executionOptions @volatile private var failureException: Option[Throwable] = None @volatile private var signalException: Option[Throwable] = None @volatile private var exitedNormally = false @@ -73,6 +77,10 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification @volatile private var sparkAppId: Option[String] = None @volatile private var warningFlag: Boolean = false @volatile private var journalOpt: Option[Journal] = None + @volatile private var computeEngineId: Option[String] = None + @volatile private var numberOfExecutorsMin: Option[Int] = None + @volatile private var numberOfExecutorsMax: Option[Int] = None + @volatile private var executorType: Option[String] = None init() @@ -185,6 +193,32 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification this.journalOpt = Option(journal) } + override def setComputeEngineId(computeEngineIdIn: String): Unit = synchronized { + computeEngineId = Option(computeEngineIdIn) + } + + override def setNumberOfExecutorsMin(nIn: Int): Unit = synchronized { + numberOfExecutorsMin = Option(nIn) + if (numberOfExecutorsMax.exists(_ < nIn)) { + numberOfExecutorsMax = Option(nIn) + } + } + + override def setNumberOfExecutorsMax(nIn: Int): Unit = synchronized { + numberOfExecutorsMax = Option(nIn) + if (numberOfExecutorsMin.exists(_ > nIn)) { + numberOfExecutorsMin = Option(nIn) + } + } + + override def setExecutorType(executorTypeIn: String): Unit = synchronized { + executorType = Option(executorTypeIn) + } + + override def setExecutionAdditionalOption(key: String, value: String): Unit = synchronized { + executionAdditionalOptions.put(key, value) + } + override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized { taskResults ++= statuses.filter(_.runStatus != NotRan) if (statuses.exists(_.runStatus.isFailure)) { @@ -294,28 +328,38 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification environmentName, batchId, sparkAppId.getOrElse(""), - None, + computeEngineId, tenant, country, runtimeConfig.runDate.toString, runtimeConfig.runDateTo.map(_.toString), startedInstant.getEpochSecond, finishedInstant.getOrElse(Instant.now()).getEpochSecond, - None, - None, - None, + numberOfExecutorsMin, + numberOfExecutorsMax, + executorType, pipelineInfo.status.toString, runtimeConfig.isRerun || runtimeConfig.historicalRunMode == RunMode.ForceRun, - 1, - 1, + runtimeConfig.attempt, + runtimeConfig.maxAttempts, pipelineInfo.failureException.map(_.getMessage.take(1000)), - None + getExecutionAdditionalOptions ) journal.addPipelineEntry(execution) } } + private def getExecutionAdditionalOptions: Option[String] = { + if (executionAdditionalOptions.isEmpty) + None + else { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + Some(mapper.writeValueAsString(executionAdditionalOptions.toMap)) + } + } + protected def sendNotificationEmail(): Unit = { failureException.foreach(ex => log.error(s"The job has FAILED.", ex)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala index 28edb251c..ec3a18a4e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala @@ -43,7 +43,8 @@ object RuntimeConfigFactory { sparkAppDescriptionTemplate: Option[String] = None, attempt: Int = 1, maxAttempts: Int = 1, - forceReCreateHiveTables: Boolean = false): RuntimeConfig = { + forceReCreateHiveTables: Boolean = false, + executionOptions: Map[String, String] = Map.empty): RuntimeConfig = { RuntimeConfig(isDryRun, isRerun, runTables, @@ -64,7 +65,8 @@ object RuntimeConfigFactory { sparkAppDescriptionTemplate, attempt, maxAttempts, - forceReCreateHiveTables) + forceReCreateHiveTables, + executionOptions) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala index 73659f595..fdc3aa126 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala @@ -21,6 +21,7 @@ import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.mocks.{PipelineInfoFactory, PipelineStateSnapshotFactory} import za.co.absa.pramen.core.state.PipelineState +import scala.collection.mutable import scala.collection.mutable.ListBuffer class PipelineStateSpy extends PipelineState { @@ -34,6 +35,11 @@ class PipelineStateSpy extends PipelineState { var closeCalled = 0 var sparkAppId: Option[String] = None var journalOpt: Option[Journal] = None + val executionAdditionalOptions: mutable.Map[String, String] = new mutable.HashMap[String, String] + var computeEngineId: Option[String] = None + var numberOfExecutorsMin: Option[Int] = None + var numberOfExecutorsMax: Option[Int] = None + var executorType: Option[String] = None override def getState: PipelineStateSnapshot = { PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(PipelineInfoFactory.getDummyPipelineInfo(sparkApplicationId = sparkAppId), @@ -68,6 +74,32 @@ class PipelineStateSpy extends PipelineState { this.journalOpt = Option(journal) } + override def setComputeEngineId(computeEngineIdIn: String): Unit = synchronized { + computeEngineId = Option(computeEngineIdIn) + } + + override def setNumberOfExecutorsMin(nIn: Int): Unit = synchronized { + numberOfExecutorsMin = Option(nIn) + if (numberOfExecutorsMax.exists(_ < nIn)) { + numberOfExecutorsMax = Option(nIn) + } + } + + override def setNumberOfExecutorsMax(nIn: Int): Unit = synchronized { + numberOfExecutorsMax = Option(nIn) + if (numberOfExecutorsMin.exists(_ > nIn)) { + numberOfExecutorsMin = Option(nIn) + } + } + + override def setExecutorType(executorTypeIn: String): Unit = synchronized { + executorType = Option(executorTypeIn) + } + + override def setExecutionAdditionalOption(key: String, value: String): Unit = synchronized { + executionAdditionalOptions.put(key, value) + } + override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized { completedStatuses ++= statuses } From 9e3cb5f3e0ef388286a44426c6f917c329d30814 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 11 May 2026 11:52:18 +0200 Subject: [PATCH 3/7] #738 Add DynamoDB support for the executions table. --- .../pramen/core/journal/JournalDynamoDB.scala | 158 +++++++++++++++--- 1 file changed, 138 insertions(+), 20 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala index df27c07f7..0bf463620 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala @@ -51,9 +51,12 @@ class JournalDynamoDB private ( private val journalTableBaseName = s"${tablePrefix}_${JournalDynamoDB.DEFAULT_JOURNAL_TABLE}" private val journalTableFullName = BookkeeperDynamoDb.getFullTableName(tableArn, journalTableBaseName) + private val executionsTableBaseName = s"${tablePrefix}_${JournalDynamoDB.DEFAULT_EXECUTIONS_TABLE}" + private val executionsTableFullName = BookkeeperDynamoDb.getFullTableName(tableArn, executionsTableBaseName) - // Initialize table on creation + // Initialize tables on creation createJournalTableIfNotExists() + createExecutionsTableIfNotExists() /** * Add a task completion entry to the journal. @@ -120,7 +123,43 @@ class JournalDynamoDB private ( } override def addPipelineEntry(execution: Execution): Unit = { - // ToDo add the implementation for DynamoDB + val itemBuilder = Map.newBuilder[String, AttributeValue] + + // Primary key: composite of pipelineName and batchId + itemBuilder += (JournalDynamoDB.ATTR_EXEC_PIPELINE_NAME -> AttributeValue.builder().s(execution.pipelineName).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_BATCH_ID -> AttributeValue.builder().n(execution.batchId.toString).build()) + + itemBuilder += (JournalDynamoDB.ATTR_EXEC_PIPELINE_ID -> AttributeValue.builder().s(execution.pipelineId).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_ENVIRONMENT_NAME -> AttributeValue.builder().s(execution.environmentName).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_SPARK_APP_ID -> AttributeValue.builder().s(execution.sparkApplicationId).build()) + execution.computeEngineId.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_COMPUTE_ENGINE_ID -> AttributeValue.builder().s(v).build())) + execution.tenant.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_TENANT -> AttributeValue.builder().s(v).build())) + execution.country.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_COUNTRY -> AttributeValue.builder().s(v).build())) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_RUN_DATE_FROM -> AttributeValue.builder().s(execution.runDateFrom).build()) + execution.runDateTo.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_RUN_DATE_TO -> AttributeValue.builder().s(v.format(dateFormatter)).build())) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_STARTED_AT -> AttributeValue.builder().n(execution.startedAt.toString).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_FINISHED_AT -> AttributeValue.builder().n(execution.finishedAt.toString).build()) + execution.numberOfExecutorsMin.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_NUMBER_OF_EXECUTORS_MIN -> AttributeValue.builder().n(v.toString).build())) + execution.numberOfExecutorsMax.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_NUMBER_OF_EXECUTORS_MAX -> AttributeValue.builder().n(v.toString).build())) + execution.executorType.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_EXECUTOR_TYPE -> AttributeValue.builder().s(v).build())) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_STATUS -> AttributeValue.builder().s(execution.status).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_IS_RERUN -> AttributeValue.builder().s(execution.isRerun.toString).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_ATTEMPT_NUMBER -> AttributeValue.builder().s(execution.attemptNumber.toString).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_NUMBER_OF_ATTEMPTS -> AttributeValue.builder().s(execution.numberOfAttempts.toString).build()) + execution.failureReason.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_FAILURE_REASON -> AttributeValue.builder().s(v).build())) + execution.additionalOptions.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_ADDITIONAL_OPTIONS -> AttributeValue.builder().s(v).build())) + + try { + val putRequest = PutItemRequest.builder() + .tableName(executionsTableFullName) + .item(itemBuilder.result().asJava) + .build() + + dynamoDbClient.putItem(putRequest) + } catch { + case NonFatal(ex) => + log.error(s"Unable to write to the executions table '$executionsTableFullName'.", ex) + } } /** @@ -181,6 +220,61 @@ class JournalDynamoDB private ( } } + /** + * Creates the executions table if it doesn't exist. + */ + private def createExecutionsTableIfNotExists(): Unit = { + try { + val describeRequest = DescribeTableRequest.builder() + .tableName(executionsTableFullName) + .build() + + dynamoDbClient.describeTable(describeRequest) + log.info(s"Executions table '$executionsTableFullName' already exists") + } catch { + case _: ResourceNotFoundException => + log.info(s"Creating executions table '$executionsTableFullName'") + createExecutionsTable() + case NonFatal(ex) => + log.error(s"Error checking if executions table exists", ex) + throw ex + } + } + + /** + * Creates the executions table in DynamoDB. + */ + private def createExecutionsTable(): Unit = { + val createRequest = CreateTableRequest.builder() + .tableName(executionsTableFullName) + .attributeDefinitions( + AttributeDefinition.builder() + .attributeName(JournalDynamoDB.ATTR_EXEC_PIPELINE_NAME) + .attributeType(ScalarAttributeType.S) + .build(), + AttributeDefinition.builder() + .attributeName(JournalDynamoDB.ATTR_EXEC_BATCH_ID) + .attributeType(ScalarAttributeType.N) + .build() + ) + .keySchema( + KeySchemaElement.builder() + .attributeName(JournalDynamoDB.ATTR_EXEC_PIPELINE_NAME) + .keyType(KeyType.HASH) + .build(), + KeySchemaElement.builder() + .attributeName(JournalDynamoDB.ATTR_EXEC_BATCH_ID) + .keyType(KeyType.RANGE) + .build() + ) + .billingMode(BillingMode.PAY_PER_REQUEST) + .build() + + dynamoDbClient.createTable(createRequest) + waitForTableActive(executionsTableFullName, dynamoDbClient) + log.info(s"Executions table '$executionsTableFullName' created successfully") + } + /** * Creates the journal table if it doesn't exist. */ @@ -251,31 +345,55 @@ class JournalDynamoDB private ( object JournalDynamoDB { val DEFAULT_JOURNAL_TABLE = "journal" + val DEFAULT_EXECUTIONS_TABLE = "executions" val DEFAULT_TABLE_PREFIX = "pramen" // Maximum length for failure reason (4KB minus some overhead) val MAX_FAILURE_REASON_LENGTH = 4000 + // Attribute names for executions table + val ATTR_EXEC_PIPELINE_ID = "pipeline_id" + val ATTR_EXEC_PIPELINE_NAME = "pipeline_name" + val ATTR_EXEC_ENVIRONMENT_NAME = "environment_name" + val ATTR_EXEC_BATCH_ID = "batch_id" + val ATTR_EXEC_SPARK_APP_ID = "spark_application_id" + val ATTR_EXEC_COMPUTE_ENGINE_ID = "compute_engine_id" + val ATTR_EXEC_TENANT = "tenant" + val ATTR_EXEC_COUNTRY = "country" + val ATTR_EXEC_RUN_DATE_FROM = "run_date_from" + val ATTR_EXEC_RUN_DATE_TO = "run_date_to" + val ATTR_EXEC_STARTED_AT = "started_at" + val ATTR_EXEC_FINISHED_AT = "finished_at" + val ATTR_EXEC_NUMBER_OF_EXECUTORS_MIN = "number_of_executors_min" + val ATTR_EXEC_NUMBER_OF_EXECUTORS_MAX = "number_of_executors_max" + val ATTR_EXEC_EXECUTOR_TYPE = "executor_type" + val ATTR_EXEC_STATUS = "status" + val ATTR_EXEC_IS_RERUN = "is_rerun" + val ATTR_EXEC_ATTEMPT_NUMBER = "attempt_number" + val ATTR_EXEC_NUMBER_OF_ATTEMPTS = "number_of_attempts" + val ATTR_EXEC_FAILURE_REASON = "failure_reason" + val ATTR_EXEC_ADDITIONAL_OPTIONS = "additional_options" + // Attribute names for journal table - val ATTR_JOB_NAME = "jobName" - val ATTR_TABLE_NAME = "tableName" - val ATTR_PERIOD_BEGIN = "periodBegin" - val ATTR_PERIOD_END = "periodEnd" - val ATTR_INFO_DATE = "infoDate" - val ATTR_INPUT_RECORD_COUNT = "inputRecordCount" - val ATTR_INPUT_RECORD_COUNT_OLD = "inputRecordCountOld" - val ATTR_OUTPUT_RECORD_COUNT = "outputRecordCount" - val ATTR_OUTPUT_RECORD_COUNT_OLD = "outputRecordCountOld" - val ATTR_APPENDED_RECORD_COUNT = "appendedRecordCount" - val ATTR_OUTPUT_SIZE = "outputSize" - val ATTR_STARTED_AT = "startedAt" - val ATTR_FINISHED_AT = "finishedAt" + val ATTR_JOB_NAME = "job_name" + val ATTR_TABLE_NAME = "table_name" + val ATTR_PERIOD_BEGIN = "period_begin" + val ATTR_PERIOD_END = "period_end" + val ATTR_INFO_DATE = "info_date" + val ATTR_INPUT_RECORD_COUNT = "input_record_count" + val ATTR_INPUT_RECORD_COUNT_OLD = "input_record_count_old" + val ATTR_OUTPUT_RECORD_COUNT = "output_record_count" + val ATTR_OUTPUT_RECORD_COUNT_OLD = "output_record_count_old" + val ATTR_APPENDED_RECORD_COUNT = "appended_record_count" + val ATTR_OUTPUT_SIZE = "output_size" + val ATTR_STARTED_AT = "started_at" + val ATTR_FINISHED_AT = "finished_at" val ATTR_STATUS = "status" - val ATTR_FAILURE_REASON = "failureReason" - val ATTR_SPARK_APP_ID = "sparkApplicationId" - val ATTR_PIPELINE_ID = "pipelineId" - val ATTR_PIPELINE_NAME = "pipelineName" - val ATTR_ENVIRONMENT_NAME = "environmentName" + val ATTR_FAILURE_REASON = "failure_reason" + val ATTR_SPARK_APP_ID = "spark_application_id" + val ATTR_PIPELINE_ID = "pipeline_id" + val ATTR_PIPELINE_NAME = "pipeline_name" + val ATTR_ENVIRONMENT_NAME = "environment_name" val ATTR_TENANT = "tenant" val ATTR_COUNTRY = "country" val ATTR_BATCH_ID = "batchId" From 4cd8df77b00f9086fab0a51c40666c41ddcfed79 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 11 May 2026 13:20:28 +0200 Subject: [PATCH 4/7] #738 Stitch to snake case for DynamoDB table attriubutes to be the same as for JDBC bookkeeping tables. --- .../core/bookkeeper/BookkeeperDynamoDb.scala | 24 +++++++++---------- .../bookkeeper/OffsetManagerDynamoDb.scala | 18 +++++++------- .../pramen/core/lock/TokenLockDynamoDb.scala | 4 ++-- .../metadata/MetadataManagerDynamoDb.scala | 12 +++++----- 4 files changed, 29 insertions(+), 29 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala index 734a9ff23..3dda811e8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala @@ -792,20 +792,20 @@ object BookkeeperDynamoDb { val DEFAULT_TABLE_PREFIX = "pramen" // Attribute names for bookkeeping table - val ATTR_TABLE_NAME = "tableName" - val ATTR_INFO_DATE = "infoDate" - val ATTR_INFO_DATE_SORT_KEY = "infoDateSortKey" // Composite: "infoDate#jobFinished" - val ATTR_INFO_DATE_BEGIN = "infoDateBegin" - val ATTR_INFO_DATE_END = "infoDateEnd" - val ATTR_INPUT_RECORD_COUNT = "inputRecordCount" - val ATTR_OUTPUT_RECORD_COUNT = "outputRecordCount" - val ATTR_JOB_STARTED = "jobStarted" - val ATTR_JOB_FINISHED = "jobFinished" - val ATTR_BATCH_ID = "batchId" - val ATTR_APPENDED_RECORD_COUNT = "appendedRecordCount" + val ATTR_TABLE_NAME = "table_name" + val ATTR_INFO_DATE = "info_date" + val ATTR_INFO_DATE_SORT_KEY = "info_date_sort_key" // Composite: "infoDate#jobFinished" + val ATTR_INFO_DATE_BEGIN = "info_date_begin" + val ATTR_INFO_DATE_END = "info_date_end" + val ATTR_INPUT_RECORD_COUNT = "input_record_count" + val ATTR_OUTPUT_RECORD_COUNT = "output_record_count" + val ATTR_JOB_STARTED = "job_started" + val ATTR_JOB_FINISHED = "job_finished" + val ATTR_BATCH_ID = "batch_id" + val ATTR_APPENDED_RECORD_COUNT = "appended_record_count" // Attribute names for schema table - val ATTR_SCHEMA_JSON = "schemaJson" + val ATTR_SCHEMA_JSON = "schema_json" val MODEL_VERSION = 1 diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala index 0c5f872d8..36627538d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala @@ -555,15 +555,15 @@ object OffsetManagerDynamoDb { val DEFAULT_TABLE_PREFIX = "pramen" // Attribute names for offset table - val ATTR_PRAMEN_TABLE_NAME = "pramenTableName" - val ATTR_COMPOSITE_KEY = "compositeKey" // Format: "infoDate#createdAtMilli" - val ATTR_INFO_DATE = "infoDate" - val ATTR_DATA_TYPE = "dataType" - val ATTR_MIN_OFFSET = "minOffset" - val ATTR_MAX_OFFSET = "maxOffset" - val ATTR_BATCH_ID = "batchId" - val ATTR_CREATED_AT = "createdAt" - val ATTR_COMMITTED_AT = "committedAt" + val ATTR_PRAMEN_TABLE_NAME = "pramen_table_name" + val ATTR_COMPOSITE_KEY = "composite_key" // Format: "infoDate#createdAtMilli" + val ATTR_INFO_DATE = "info_date" + val ATTR_DATA_TYPE = "data_type" + val ATTR_MIN_OFFSET = "min_offset" + val ATTR_MAX_OFFSET = "max_offset" + val ATTR_BATCH_ID = "batch_id" + val ATTR_CREATED_AT = "created_at" + val ATTR_COMMITTED_AT = "committed_at" /** * Builder for creating OffsetManagerDynamoDb instances. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala index 454e90900..10dafdc9f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala @@ -33,8 +33,8 @@ object TokenLockDynamoDb { // Attribute names val ATTR_TOKEN = "job_token" // 'token' is a reserved word in DynamoDb and can't be used as an attribute val ATTR_OWNER = "job_owner" // 'owner' is a reserved word in DynamoDb and can't be used as an attribute - val ATTR_EXPIRES = "expiresAt" - val ATTR_CREATED_AT = "createdAt" + val ATTR_EXPIRES = "expires_at" + val ATTR_CREATED_AT = "created_at" val TICKETS_HARD_EXPIRE_DAYS = 1 } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala index 31b8a8932..25e62017e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala @@ -248,12 +248,12 @@ object MetadataManagerDynamoDb { val DEFAULT_TABLE_PREFIX = "pramen" // Attribute names for metadata table - val ATTR_COMPOSITE_KEY = "compositeKey" // tableName#infoDate - val ATTR_METADATA_KEY = "metadataKey" - val ATTR_METADATA_VALUE = "metadataValue" - val ATTR_LAST_UPDATED = "lastUpdated" - val ATTR_TABLE_NAME = "tableName" // For filtering/queries - val ATTR_INFO_DATE = "infoDate" // For filtering/queries + val ATTR_COMPOSITE_KEY = "composite_key" // tableName#infoDate + val ATTR_METADATA_KEY = "metadata_key" + val ATTR_METADATA_VALUE = "metadata_value" + val ATTR_LAST_UPDATED = "last_updated" + val ATTR_TABLE_NAME = "table_name" // For filtering/queries + val ATTR_INFO_DATE = "info_date" // For filtering/queries /** * Builder for creating MetadataManagerDynamoDb instances. From 3f4a463c1378ab67bfea00af89dcfae61ff9eb8e Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 11 May 2026 14:52:10 +0200 Subject: [PATCH 5/7] #738 Add support for executor and other properries to be added to the 'executions' table. --- .../absa/pramen/core/runner/AppRunner.scala | 106 +++++++++++++----- .../pramen/core/state/PipelineStateImpl.scala | 12 +- .../core/tests/runner/AppRunnerSuite.scala | 52 +++++++++ 3 files changed, 142 insertions(+), 28 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala index dd02cfa10..ef0c8d408 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala @@ -157,11 +157,10 @@ object AppRunner { val hosts = getExecutorNodes hosts.foreach(host => log.info(s"Executor node: $host")) - setMinMaxExecutors(spark, hosts, state) - } else { - setMinMaxExecutors(spark, Seq.empty, state) } setExecutorNodeType(spark, state) + setMinMaxExecutors(spark, state) + setExecutionAdditionalProperties(spark, state) }, state, "Spark List of executor nodes") } @@ -174,50 +173,103 @@ object AppRunner { data.mapPartitions { _ => Iterable(java.net.InetAddress.getLocalHost.getHostName).iterator }.collect().distinct.sorted } - private[core] def setMinMaxExecutors(implicit spark: SparkSession, executorNodes: Seq[String], state: PipelineState): Unit = { - val executors = if (executorNodes.isEmpty) { - spark.sparkContext.getExecutorMemoryStatus.keySet + private[core] def setMinMaxExecutors(implicit spark: SparkSession, state: PipelineState): Unit = { + val executors = spark.sparkContext.getExecutorMemoryStatus.keySet .filter(_ != "driver") - } else { - executorNodes + + val maxNumExecutors = spark.conf.getOption("spark.executor.instances").orElse( + spark.conf.getOption("spark.dynamicAllocation.maxExecutors")) match { + case Some(s) => s.toInt + case None => executors.size } - val numExecutors = executors.size - state.setNumberOfExecutorsMax(numExecutors) + state.setNumberOfExecutorsMax(maxNumExecutors) val dynamicAllocEnabled = spark.conf.get("spark.dynamicAllocation.enabled", "false").toBoolean - if (dynamicAllocEnabled) { - state.setNumberOfExecutorsMin(1) + val minNumExecutors = if (dynamicAllocEnabled) { + spark.conf.getOption("spark.dynamicAllocation.minExecutors") match { + case Some(s) => s.toInt + case None => 1 + } } else { - state.setNumberOfExecutorsMin(numExecutors) + maxNumExecutors } + + state.setNumberOfExecutorsMin(minNumExecutors) } - private[core] def setExecutorNodeType(implicit spark: SparkSession, state: PipelineState): Unit = { - // Get first executor and construct a string like: - // C32M64 meaning 32 virtual CPUs and 64 GB of memory - try { + private[core] def getNumberOfExecutorCores(spark: SparkSession): Int = { + spark.conf.getOption("spark.executor.cores") match { + case Some(s) => s.toInt + case None => Runtime.getRuntime.availableProcessors() + } + } + + private[core] def getNumberOfExecutorMemoryGb(spark: SparkSession): Int = { + val memAttempt1 = spark.conf.getOption("spark.executor.memory") match { + case Some(s) => parseMemorySizeInGb(s) + case None => None + } + + memAttempt1.getOrElse { val executorMemoryStatus = spark.sparkContext.getExecutorMemoryStatus val executorEntries = executorMemoryStatus.filterKeys(_ != "driver") if (executorEntries.nonEmpty) { val (_, (maxMemory, _)) = executorEntries.head val memoryGb = maxMemory / (1024L * 1024L * 1024L) + memoryGb.toInt + } else { + log.warn("No executors found to determine the amount of memory of the executor") + 0 + } + } + } - val cores = spark.conf.get("spark.executor.cores", "0").toInt - val maxThreads = if (cores == 0) Runtime.getRuntime.availableProcessors() else cores - - val nodeType = s"C${maxThreads}M$memoryGb" - log.info(s"Executor node type: $nodeType") - state.setExecutorType(nodeType) + private[core] def parseMemorySizeInGb(s: String): Option[Int] = { + val trimmed = s.trim.toLowerCase + val memoryGb = Try { + if (trimmed.endsWith("g")) { + trimmed.dropRight(1).toDouble + } else if (trimmed.endsWith("m")) { + trimmed.dropRight(1).toDouble / 1024.0 + } else if (trimmed.endsWith("k")) { + trimmed.dropRight(1).toDouble / (1024.0 * 1024.0) + } else if (trimmed.endsWith("t")) { + trimmed.dropRight(1).toDouble * 1024.0 } else { - log.warn("No executors found to determine node type.") + trimmed.toDouble / (1024.0 * 1024.0 * 1024.0) } - } catch { - case ex: Exception => - log.warn(s"Unable to determine executor node type: ${ex.getMessage}") } + memoryGb.map(Math.round(_).toInt).toOption + } + + private[core] def setExecutionAdditionalProperties(implicit spark: SparkSession, state: PipelineState): Unit = { + spark.conf.getOption("spark.glue.JOB_RUN_ID").foreach { glueId => + state.setComputeEngineId(glueId) + } + + spark.conf.getOption("spark.glue.GLUE_VERSION").foreach { glueVersion => + state.setExecutionAdditionalOption("glue_version", glueVersion) + } + + spark.conf.getOption("spark.glue.accountId").foreach { awsAccount => + state.setExecutionAdditionalOption("aws_account_id", awsAccount) + } + + spark.conf.getOption("spark.glue.JOB_NAME").foreach { glueJobName => + state.setExecutionAdditionalOption("glue_job_name", glueJobName) + } + } + + private[core] def setExecutorNodeType(implicit spark: SparkSession, state: PipelineState): Unit = { + // Get first executor and construct a string like: + // C32M64 meaning 32 virtual CPUs and 64 GB of memory + val cpus = getNumberOfExecutorCores(spark) + val memoryGb = getNumberOfExecutorMemoryGb(spark) + + state.setExecutorType(s"C${cpus}M$memoryGb") } private[core] def logBanner(implicit spark: SparkSession): Try[Unit] = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index e19bd929b..714cddeae 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -321,6 +321,16 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification protected def addJournalEntry(): Unit = { val pipelineInfo = getPipelineInfo + val failureReason = if (pipelineInfo.status == PipelineStatus.Success || pipelineInfo.status == PipelineStatus.Warning) { + None + } else { + pipelineInfo.failureException match { + case Some(ex) => Option(RunStatus.getShortExceptionDescription(ex)) + case None => + val firstReason = taskResults.map(_.runStatus).find(_.isFailure).map(_.getReason.getOrElse("")) + firstReason + } + } journalOpt.foreach { journal => val execution = Execution( pipelineId, @@ -342,7 +352,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification runtimeConfig.isRerun || runtimeConfig.historicalRunMode == RunMode.ForceRun, runtimeConfig.attempt, runtimeConfig.maxAttempts, - pipelineInfo.failureException.map(_.getMessage.take(1000)), + failureReason.map(_.take(1000)), getExecutionAdditionalOptions ) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/AppRunnerSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/AppRunnerSuite.scala index 92e6f4977..b741ec11f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/AppRunnerSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/AppRunnerSuite.scala @@ -246,6 +246,58 @@ class AppRunnerSuite extends AnyWordSpec with SparkTestBase { } } + "setMinMaxExecutors" should { + "set min and max executors on the pipeline state" in { + val state = new PipelineStateSpy + + AppRunner.setMinMaxExecutors(spark, state) + + assert(state.numberOfExecutorsMin.isDefined) + assert(state.numberOfExecutorsMax.isDefined) + assert(state.numberOfExecutorsMin.get >= 1) + assert(state.numberOfExecutorsMax.get >= state.numberOfExecutorsMin.get) + } + } + + "getNumberOfExecutorCores" should { + "return the number of executor cores from the Spark session" in { + val cores = AppRunner.getNumberOfExecutorCores(spark) + + assert(cores >= 1) + } + } + + "getNumberOfExecutorMemoryGb" should { + "return the executor memory in GB from the Spark session" in { + val memoryGb = AppRunner.getNumberOfExecutorMemoryGb(spark) + + assert(memoryGb > 0.0) + } + } + + "parseMemorySizeInGb" should { + "parse gigabytes" in { + assert(AppRunner.parseMemorySizeInGb("2g").get == 2) + assert(AppRunner.parseMemorySizeInGb("4G").get == 4) + } + + "parse megabytes" in { + assert(AppRunner.parseMemorySizeInGb("1024m").get == 1) + assert(AppRunner.parseMemorySizeInGb("2048M").get == 2) + assert(AppRunner.parseMemorySizeInGb("512m").get == 1) + } + + "parse terabytes" in { + assert(AppRunner.parseMemorySizeInGb("1t").get == 1024) + assert(AppRunner.parseMemorySizeInGb("2T").get == 2048) + } + + "parse kilobytes" in { + assert(AppRunner.parseMemorySizeInGb("1048576k").get == 1) + assert(AppRunner.parseMemorySizeInGb("1048576K").get == 1) + } + } + "handleFailure()" should { val state = getMockPipelineState From 8a6897686e6098eecbf5215e025b83a2295ad0cd Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 12 May 2026 07:06:08 +0200 Subject: [PATCH 6/7] #738 Add number of records and maximum width of schema to the executions journal. --- .../scala/za/co/absa/pramen/api/Pramen.scala | 6 +++++ .../absa/pramen/api/mocks/DummyPramen.scala | 6 +++++ .../za/co/absa/pramen/core/PramenImpl.scala | 6 +++++ .../pramen/core/journal/JournalDynamoDB.scala | 5 ++++ .../pramen/core/journal/model/Execution.scala | 2 ++ .../core/journal/model/ExecutionsTable.scala | 25 ++++++++++++++++--- .../pramen/core/pipeline/IngestionJob.scala | 13 +++++++++- .../absa/pramen/core/runner/AppRunner.scala | 9 +++++-- .../core/runner/task/TaskRunnerBase.scala | 5 ++-- .../pramen/core/state/PipelineState.scala | 6 +++++ .../pramen/core/state/PipelineStateImpl.scala | 16 ++++++++++++ .../absa/pramen/core/utils/SparkUtils.scala | 19 ++++++++++++++ .../core/mocks/state/PipelineStateSpy.scala | 14 +++++++++++ 13 files changed, 123 insertions(+), 9 deletions(-) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala index 18d6b4ff0..c7e1de1f8 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala @@ -84,6 +84,12 @@ trait Pramen { def setExecutorType(executorType: String): Unit + def setNumberOfRecordsIngested(count: Long): Unit + + def addNumberOfRecordsIngested(count: Long): Unit + + def setMaximumNumberOfColumns(count: Long): Unit + def setExecutionAdditionalOption(key: String, value: String): Unit } diff --git a/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala b/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala index d2d40d2ad..cb9b6b131 100644 --- a/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala +++ b/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala @@ -49,5 +49,11 @@ class DummyPramen extends Pramen { override def setExecutorType(executorType: String): Unit = {} + override def setNumberOfRecordsIngested(count: Long): Unit = {} + + override def addNumberOfRecordsIngested(count: Long): Unit = {} + + override def setMaximumNumberOfColumns(count: Long): Unit = {} + override def setExecutionAdditionalOption(key: String, value: String): Unit = {} } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala index 24383bcbb..3be5318e6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala @@ -94,6 +94,12 @@ class PramenImpl extends Pramen { override def setExecutorType(executorType: String): Unit = _pipelineState.foreach(_.setExecutorType(executorType)) + override def setNumberOfRecordsIngested(count: Long): Unit = _pipelineState.foreach(_.setNumberOfRecordsIngested(count)) + + override def addNumberOfRecordsIngested(count: Long): Unit = _pipelineState.foreach(_.addNumberOfRecordsIngested(count)) + + override def setMaximumNumberOfColumns(count: Long): Unit = _pipelineState.foreach(_.setMaximumNumberOfColumns(count)) + override def setExecutionAdditionalOption(key: String, value: String): Unit = _pipelineState.foreach(_.setExecutionAdditionalOption(key, value)) private[core] def setWorkflowConfig(config: Config): Unit = synchronized { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala index 0bf463620..b8bdb2b6a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala @@ -147,6 +147,8 @@ class JournalDynamoDB private ( itemBuilder += (JournalDynamoDB.ATTR_EXEC_ATTEMPT_NUMBER -> AttributeValue.builder().s(execution.attemptNumber.toString).build()) itemBuilder += (JournalDynamoDB.ATTR_EXEC_NUMBER_OF_ATTEMPTS -> AttributeValue.builder().s(execution.numberOfAttempts.toString).build()) execution.failureReason.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_FAILURE_REASON -> AttributeValue.builder().s(v).build())) + execution.numberOfRecordsIngested.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_NUMBER_OR_RECORDS_INGESTED -> AttributeValue.builder().n(v.toString).build())) + execution.maxNumberOfColumns.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_MAX_NUMBER_OF_COLUMNS -> AttributeValue.builder().n(v.toString).build())) execution.additionalOptions.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_ADDITIONAL_OPTIONS -> AttributeValue.builder().s(v).build())) try { @@ -372,6 +374,9 @@ object JournalDynamoDB { val ATTR_EXEC_ATTEMPT_NUMBER = "attempt_number" val ATTR_EXEC_NUMBER_OF_ATTEMPTS = "number_of_attempts" val ATTR_EXEC_FAILURE_REASON = "failure_reason" + val ATTR_EXEC_NUMBER_OR_RECORDS_INGESTED = "number_of_records_ingested" + val ATTR_EXEC_MAX_NUMBER_OF_COLUMNS = "max_number_of_columns" + val ATTR_EXEC_ADDITIONAL_OPTIONS = "additional_options" // Attribute names for journal table diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala index fbe4d7c60..30497b4e2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala @@ -37,5 +37,7 @@ case class Execution( attemptNumber: Int, numberOfAttempts: Int, failureReason: Option[String], + numberOfRecordsIngested: Option[Long], + maxNumberOfColumns: Option[Long], additionalOptions: Option[String] ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala index 41746a0c5..32e395c44 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala @@ -43,12 +43,29 @@ trait ExecutionsTable { def attemptNumber = column[Int]("attempt_number") def numberOfAttempts = column[Int]("number_of_attempts") def failureReason = column[Option[String]]("failure_reason") + def numberOfRecordsIngested = column[Option[Long]]("number_of_records_ingested") + def maxNumberOfColumns = column[Option[Long]]("max_number_of_columns") def additionalOptions = column[Option[String]]("additional_options") - def * = (pipelineId, pipelineName, environmentName, batchId, sparkApplicationId, - computeEngineId, tenant, country, runDateFrom, runDateTo, startedAt, finishedAt, numberOfExecutorsMin, - numberOfExecutorsMax, executorType, status, isRerun, attemptNumber, numberOfAttempts, failureReason, - additionalOptions) <> (Execution.tupled, Execution.unapply) + private type ExecutionTuple = ( + (String, String, String, Long, String, Option[String], Option[String], Option[String], String, Option[String], Long, Long), + (Option[Int], Option[Int], Option[String], String, Boolean, Int, Int, Option[String], Option[Long], Option[Long], Option[String]) + ) + + private def toExecution(t: ExecutionTuple): Execution = Execution( + t._1._1, t._1._2, t._1._3, t._1._4, t._1._5, t._1._6, t._1._7, t._1._8, t._1._9, t._1._10, t._1._11, t._1._12, + t._2._1, t._2._2, t._2._3, t._2._4, t._2._5, t._2._6, t._2._7, t._2._8, t._2._9, t._2._10, t._2._11 + ) + + private def fromExecution(e: Execution): Option[ExecutionTuple] = Some( + (e.pipelineId, e.pipelineName, e.environmentName, e.batchId, e.sparkApplicationId, e.computeEngineId, e.tenant, e.country, e.runDateFrom, e.runDateTo, e.startedAt, e.finishedAt), + (e.numberOfExecutorsMin, e.numberOfExecutorsMax, e.executorType, e.status, e.isRerun, e.attemptNumber, e.numberOfAttempts, e.failureReason, e.numberOfRecordsIngested, e.maxNumberOfColumns, e.additionalOptions) + ) + + def * = ( + (pipelineId, pipelineName, environmentName, batchId, sparkApplicationId, computeEngineId, tenant, country, runDateFrom, runDateTo, startedAt, finishedAt), + (numberOfExecutorsMin, numberOfExecutorsMax, executorType, status, isRerun, attemptNumber, numberOfAttempts, failureReason, numberOfRecordsIngested, maxNumberOfColumns, additionalOptions) + ) <> (toExecution, fromExecution) def idx1 = index("idx_exec_started_at", startedAt, unique = false) def idx2 = index("idx_exec_finished_at", finishedAt, unique = false) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala index 17bde2bc6..441395016 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SparkSession} import za.co.absa.pramen.api.jobdef.SourceTable import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} -import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult} +import za.co.absa.pramen.api._ import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.metastore.Metastore @@ -33,6 +33,7 @@ import za.co.absa.pramen.core.utils.Emoji.WARNING import za.co.absa.pramen.core.utils.SparkUtils._ import java.time.{Instant, LocalDate} +import scala.util.Try class IngestionJob(operationDef: OperationDef, metastore: Metastore, @@ -168,6 +169,16 @@ class IngestionJob(operationDef: OperationDef, inputRecordCount: Option[Long]): SaveResult = { val stats = metastore.saveTable(outputTable.name, infoDate, df, inputRecordCount) + if (!outputTable.format.isRaw) { + val pramenOpt = Try { + Pramen.instance + }.toOption + + pramenOpt.foreach { pramen => + pramen.addNumberOfRecordsIngested(stats.recordCount.getOrElse(0L)) + } + } + try { source.postProcess( sourceTable.query, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala index ef0c8d408..ab7844a50 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala @@ -162,6 +162,11 @@ object AppRunner { setMinMaxExecutors(spark, state) setExecutionAdditionalProperties(spark, state) + log.info("Spark configuration properties:") + spark.conf.getAll.toSeq.sortBy(_._1).foreach { case (key, value) => + log.info(s" $key = $value") + } + }, state, "Spark List of executor nodes") } @@ -177,8 +182,8 @@ object AppRunner { val executors = spark.sparkContext.getExecutorMemoryStatus.keySet .filter(_ != "driver") - val maxNumExecutors = spark.conf.getOption("spark.executor.instances").orElse( - spark.conf.getOption("spark.dynamicAllocation.maxExecutors")) match { + val maxNumExecutors = spark.conf.getOption("spark.dynamicAllocation.maxExecutors").orElse( + spark.conf.getOption("spark.executor.instances")) match { case Some(s) => s.toInt case None => executors.size } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index d60126ce4..e5e34cff6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -40,7 +40,7 @@ import za.co.absa.pramen.core.state.PipelineState import za.co.absa.pramen.core.utils.Emoji._ import za.co.absa.pramen.core.utils.SparkUtils._ import za.co.absa.pramen.core.utils.hive.HiveHelper -import za.co.absa.pramen.core.utils.{ConfigUtils, ThreadUtils, TimeUtils} +import za.co.absa.pramen.core.utils.{ConfigUtils, SparkUtils, ThreadUtils, TimeUtils} import java.sql.Date import java.time.{Instant, LocalDate} @@ -432,8 +432,9 @@ abstract class TaskRunnerBase(conf: Config, val outputMetastoreHiveTable = task.job.outputTable.hiveTable.map(table => HiveHelper.getFullTable(task.job.outputTable.hiveConfig.database, table)) val hiveTableUpdates = (saveResult.hiveTablesUpdates ++ outputMetastoreHiveTable).distinct - val stats = saveResult.stats + pipelineState.setMaximumNumberOfColumns(SparkUtils.getTotalNumberOfColumns(dfTransformed.schema)) + val stats = saveResult.stats val finished = Instant.now() val completionReason = if (validationResult.status == NeedsUpdate || (validationResult.status == AlreadyRan && task.reason != TaskRunReason.Rerun)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala index d1bf87583..454485606 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala @@ -46,6 +46,12 @@ trait PipelineState extends AutoCloseable { def setExecutionAdditionalOption(key: String, value: String): Unit + def setNumberOfRecordsIngested(count: Long): Unit + + def addNumberOfRecordsIngested(count: Long): Unit + + def setMaximumNumberOfColumns(count: Long): Unit + def addTaskCompletion(statuses: Seq[TaskResult]): Unit def getExitCode: Int diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index 714cddeae..448aa9fde 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -81,6 +81,8 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification @volatile private var numberOfExecutorsMin: Option[Int] = None @volatile private var numberOfExecutorsMax: Option[Int] = None @volatile private var executorType: Option[String] = None + @volatile private var numberOfRecordsIngested: Option[Long] = None + @volatile private var maxNumberOfColumns: Option[Long] = None init() @@ -219,6 +221,18 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification executionAdditionalOptions.put(key, value) } + override def setNumberOfRecordsIngested(count: Long): Unit = synchronized { + numberOfRecordsIngested = Option(count) + } + + override def addNumberOfRecordsIngested(count: Long): Unit = synchronized { + numberOfRecordsIngested = Option(numberOfRecordsIngested.getOrElse(0L) + count) + } + + override def setMaximumNumberOfColumns(count: Long): Unit = synchronized { + maxNumberOfColumns = Option(Math.max(maxNumberOfColumns.getOrElse(0L), count)) + } + override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized { taskResults ++= statuses.filter(_.runStatus != NotRan) if (statuses.exists(_.runStatus.isFailure)) { @@ -353,6 +367,8 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification runtimeConfig.attempt, runtimeConfig.maxAttempts, failureReason.map(_.take(1000)), + numberOfRecordsIngested, + maxNumberOfColumns, getExecutionAdditionalOptions ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index faae0e78a..08b9d96d3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -934,6 +934,25 @@ object SparkUtils { } } + def getTotalNumberOfColumns(schema: StructType): Int = { + def countNestedColumns(dataType: DataType): Int = { + dataType match { + case struct: StructType => + struct.fields.foldLeft(0) { (count, field) => + count + 1 + countNestedColumns(field.dataType) + } + case arr: ArrayType => + countNestedColumns(arr.elementType) + case _ => + 0 + } + } + + schema.fields.foldLeft(0) { (count, field) => + count + 1 + countNestedColumns(field.dataType) + } + } + private def getActualProcessingTimeUdf: UserDefinedFunction = { udf((_: Long) => Instant.now().getEpochSecond) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala index fdc3aa126..fb3a192c3 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala @@ -40,6 +40,8 @@ class PipelineStateSpy extends PipelineState { var numberOfExecutorsMin: Option[Int] = None var numberOfExecutorsMax: Option[Int] = None var executorType: Option[String] = None + var numberOfRecordsIngested: Option[Long] = None + var maxNumberOfColumns: Option[Long] = None override def getState: PipelineStateSnapshot = { PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(PipelineInfoFactory.getDummyPipelineInfo(sparkApplicationId = sparkAppId), @@ -96,6 +98,18 @@ class PipelineStateSpy extends PipelineState { executorType = Option(executorTypeIn) } + override def setNumberOfRecordsIngested(count: Long): Unit = synchronized { + numberOfRecordsIngested = Option(count) + } + + override def addNumberOfRecordsIngested(count: Long): Unit = synchronized { + numberOfRecordsIngested = Option(numberOfRecordsIngested.getOrElse(0L) + count) + } + + override def setMaximumNumberOfColumns(count: Long): Unit = synchronized { + maxNumberOfColumns = Option(Math.max(maxNumberOfColumns.getOrElse(0L), count)) + } + override def setExecutionAdditionalOption(key: String, value: String): Unit = synchronized { executionAdditionalOptions.put(key, value) } From 0e4acfe35905db56d59207728615b5ce7f3b7e45 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 12 May 2026 11:03:29 +0200 Subject: [PATCH 7/7] #738 Add pipeline definition id that is added to the executions table. --- .../za/co/absa/pramen/api/PipelineInfo.scala | 1 + .../za/co/absa/pramen/core/config/Keys.scala | 2 + .../pramen/core/journal/JournalDynamoDB.scala | 4 +- .../core/journal/JournalHadoopDeltaPath.scala | 6 +- .../journal/JournalHadoopDeltaTable.scala | 4 +- .../pramen/core/journal/model/Execution.scala | 1 + .../core/journal/model/ExecutionsTable.scala | 17 +-- .../absa/pramen/core/runner/AppRunner.scala | 12 +- .../pramen/core/state/PipelineStateImpl.scala | 10 +- .../absa/pramen/core/utils/SparkUtils.scala | 9 ++ .../absa/pramen/core/GlueIcebergSuite.scala | 72 ---------- .../core/mocks/PipelineInfoFactory.scala | 3 +- .../core/tests/utils/SparkUtilsSuite.scala | 124 ++++++++++++++++++ .../EcsPipelineNotificationTargetSuite.scala | 2 +- 14 files changed, 168 insertions(+), 99 deletions(-) delete mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala index c98d9e1cd..1c9ddf4c0 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala @@ -22,6 +22,7 @@ import java.time.Instant case class PipelineInfo( pipelineName: String, + pipelineDefinitionId: String, environment: String, runtimeInfo: RuntimeInfo, startedAt: Instant, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala index b3b64c231..ebfe453a1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala @@ -17,6 +17,8 @@ package za.co.absa.pramen.core.config object Keys { + val PIPELINE_DEFINITION_ID = "pramen.pipeline.definition.id" + val INFORMATION_DATE_COLUMN = "pramen.information.date.column" val INFORMATION_DATE_FORMAT_APP = "pramen.information.date.format" diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala index b8bdb2b6a..fb58535cf 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala @@ -130,6 +130,7 @@ class JournalDynamoDB private ( itemBuilder += (JournalDynamoDB.ATTR_EXEC_BATCH_ID -> AttributeValue.builder().n(execution.batchId.toString).build()) itemBuilder += (JournalDynamoDB.ATTR_EXEC_PIPELINE_ID -> AttributeValue.builder().s(execution.pipelineId).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_PIPELINE_DEFINITION_ID -> AttributeValue.builder().s(execution.pipelineDefinitionId).build()) itemBuilder += (JournalDynamoDB.ATTR_EXEC_ENVIRONMENT_NAME -> AttributeValue.builder().s(execution.environmentName).build()) itemBuilder += (JournalDynamoDB.ATTR_EXEC_SPARK_APP_ID -> AttributeValue.builder().s(execution.sparkApplicationId).build()) execution.computeEngineId.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_COMPUTE_ENGINE_ID -> AttributeValue.builder().s(v).build())) @@ -355,6 +356,7 @@ object JournalDynamoDB { // Attribute names for executions table val ATTR_EXEC_PIPELINE_ID = "pipeline_id" + val ATTR_EXEC_PIPELINE_DEFINITION_ID = "pipeline_definition_id" val ATTR_EXEC_PIPELINE_NAME = "pipeline_name" val ATTR_EXEC_ENVIRONMENT_NAME = "environment_name" val ATTR_EXEC_BATCH_ID = "batch_id" @@ -401,7 +403,7 @@ object JournalDynamoDB { val ATTR_ENVIRONMENT_NAME = "environment_name" val ATTR_TENANT = "tenant" val ATTR_COUNTRY = "country" - val ATTR_BATCH_ID = "batchId" + val ATTR_BATCH_ID = "batchid" /** * Builder for creating JournalDynamoDB instances. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala index 05aefa69a..7a8ab710f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala @@ -42,7 +42,7 @@ class JournalHadoopDeltaPath(journalPath: String, executionsPath: String) recordDf .write .mode(SaveMode.Append) - .option("format", "delta") + .format("delta") .option("mergeSchema", "true") .save(journalPath) } @@ -57,7 +57,7 @@ class JournalHadoopDeltaPath(journalPath: String, executionsPath: String) recordDf .write .mode(SaveMode.Append) - .option("format", "delta") + .format("delta") .option("mergeSchema", "true") .save(executionsPath) } @@ -71,7 +71,7 @@ class JournalHadoopDeltaPath(journalPath: String, executionsPath: String) val df = spark .read - .option("format", "delta") + .format("delta") .load(journalPath) df.filter(col("finishedAt") >= from.getEpochSecond && col("finishedAt") <= to.getEpochSecond) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala index 33d5c18da..11425820a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala @@ -38,7 +38,7 @@ class JournalHadoopDeltaTable(database: Option[String], recordDf .write .mode(SaveMode.Append) - .option("format", "delta") + .format("delta") .option("mergeSchema", "true") .saveAsTable(getFullTableName(database, tablePrefix, "journal")) } @@ -53,7 +53,7 @@ class JournalHadoopDeltaTable(database: Option[String], recordDf .write .mode(SaveMode.Append) - .option("format", "delta") + .format("delta") .option("mergeSchema", "true") .saveAsTable(getFullTableName(database, tablePrefix, "executions")) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala index 30497b4e2..a0556b4b2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.journal.model case class Execution( pipelineId: String, + pipelineDefinitionId: String, pipelineName: String, environmentName: String, batchId: Long, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala index 32e395c44..acdb4516f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala @@ -23,7 +23,8 @@ trait ExecutionsTable { import profile.api._ class ExecutionsRecords(tag: Tag) extends Table[Execution](tag, "executions") { - def pipelineId = column[String]("pipelineId", O.Length(40)) + def pipelineId = column[String]("pipeline_id", O.Length(40)) + def pipelineDefinitionId = column[String]("pipeline_definition_id", O.Length(50)) def pipelineName = column[String]("pipeline_name", O.Length(200)) def environmentName = column[String]("environment_name", O.Length(128)) def batchId = column[Long]("batch_id") @@ -48,30 +49,30 @@ trait ExecutionsTable { def additionalOptions = column[Option[String]]("additional_options") private type ExecutionTuple = ( - (String, String, String, Long, String, Option[String], Option[String], Option[String], String, Option[String], Long, Long), + (String, String, String, String, Long, String, Option[String], Option[String], Option[String], String, Option[String], Long, Long), (Option[Int], Option[Int], Option[String], String, Boolean, Int, Int, Option[String], Option[Long], Option[Long], Option[String]) ) private def toExecution(t: ExecutionTuple): Execution = Execution( - t._1._1, t._1._2, t._1._3, t._1._4, t._1._5, t._1._6, t._1._7, t._1._8, t._1._9, t._1._10, t._1._11, t._1._12, + t._1._1, t._1._2, t._1._3, t._1._4, t._1._5, t._1._6, t._1._7, t._1._8, t._1._9, t._1._10, t._1._11, t._1._12, t._1._13, t._2._1, t._2._2, t._2._3, t._2._4, t._2._5, t._2._6, t._2._7, t._2._8, t._2._9, t._2._10, t._2._11 ) private def fromExecution(e: Execution): Option[ExecutionTuple] = Some( - (e.pipelineId, e.pipelineName, e.environmentName, e.batchId, e.sparkApplicationId, e.computeEngineId, e.tenant, e.country, e.runDateFrom, e.runDateTo, e.startedAt, e.finishedAt), + (e.pipelineId, e.pipelineDefinitionId, e.pipelineName, e.environmentName, e.batchId, e.sparkApplicationId, e.computeEngineId, e.tenant, e.country, e.runDateFrom, e.runDateTo, e.startedAt, e.finishedAt), (e.numberOfExecutorsMin, e.numberOfExecutorsMax, e.executorType, e.status, e.isRerun, e.attemptNumber, e.numberOfAttempts, e.failureReason, e.numberOfRecordsIngested, e.maxNumberOfColumns, e.additionalOptions) ) def * = ( - (pipelineId, pipelineName, environmentName, batchId, sparkApplicationId, computeEngineId, tenant, country, runDateFrom, runDateTo, startedAt, finishedAt), + (pipelineId, pipelineDefinitionId, pipelineName, environmentName, batchId, sparkApplicationId, computeEngineId, tenant, country, runDateFrom, runDateTo, startedAt, finishedAt), (numberOfExecutorsMin, numberOfExecutorsMax, executorType, status, isRerun, attemptNumber, numberOfAttempts, failureReason, numberOfRecordsIngested, maxNumberOfColumns, additionalOptions) ) <> (toExecution, fromExecution) def idx1 = index("idx_exec_started_at", startedAt, unique = false) def idx2 = index("idx_exec_finished_at", finishedAt, unique = false) - def idx3 = index("idx_exec_batchid", finishedAt, unique = false) - def idx4 = index("idx_exec_compute_engine_id", finishedAt, unique = false) - def idx5 = index("idx_exec_pipeline_id", finishedAt, unique = false) + def idx3 = index("idx_exec_batchid", batchId, unique = false) + def idx4 = index("idx_exec_compute_engine_id", computeEngineId, unique = false) + def idx5 = index("idx_exec_pipeline_id", pipelineId, unique = false) } lazy val records = TableQuery[ExecutionsRecords] diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala index ab7844a50..32c609f2c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala @@ -161,12 +161,6 @@ object AppRunner { setExecutorNodeType(spark, state) setMinMaxExecutors(spark, state) setExecutionAdditionalProperties(spark, state) - - log.info("Spark configuration properties:") - spark.conf.getAll.toSeq.sortBy(_._1).foreach { case (key, value) => - log.info(s" $key = $value") - } - }, state, "Spark List of executor nodes") } @@ -184,7 +178,7 @@ object AppRunner { val maxNumExecutors = spark.conf.getOption("spark.dynamicAllocation.maxExecutors").orElse( spark.conf.getOption("spark.executor.instances")) match { - case Some(s) => s.toInt + case Some(s) => Try(s.toInt).toOption.getOrElse(executors.size) case None => executors.size } @@ -194,7 +188,7 @@ object AppRunner { val minNumExecutors = if (dynamicAllocEnabled) { spark.conf.getOption("spark.dynamicAllocation.minExecutors") match { - case Some(s) => s.toInt + case Some(s) => Try(s.toInt).toOption.getOrElse(1) case None => 1 } } else { @@ -206,7 +200,7 @@ object AppRunner { private[core] def getNumberOfExecutorCores(spark: SparkSession): Int = { spark.conf.getOption("spark.executor.cores") match { - case Some(s) => s.toInt + case Some(s) => Try(s.toInt).toOption.getOrElse(Runtime.getRuntime.availableProcessors()) case None => Runtime.getRuntime.availableProcessors() } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index 448aa9fde..3111b802a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -26,7 +26,7 @@ import za.co.absa.pramen.api.status._ import za.co.absa.pramen.api.{NotificationBuilder, PipelineInfo, PipelineNotificationTarget, RunMode} import za.co.absa.pramen.core.app.config.RuntimeConfig.{DRY_RUN, EMAIL_IF_NO_CHANGES, UNDERCOVER} import za.co.absa.pramen.core.app.config.{HookConfig, RuntimeConfig} -import za.co.absa.pramen.core.config.Keys.{GOOD_THROUGHPUT_RPS, WARN_THROUGHPUT_RPS} +import za.co.absa.pramen.core.config.Keys.{GOOD_THROUGHPUT_RPS, PIPELINE_DEFINITION_ID, WARN_THROUGHPUT_RPS} import za.co.absa.pramen.core.exceptions.OsSignalException import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.journal.model.Execution @@ -124,6 +124,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification } else failureException + val pipelineDefinitionId = ConfigUtils.getOptionString(conf, PIPELINE_DEFINITION_ID).getOrElse("") val minRps = ConfigUtils.getOptionInt(conf, WARN_THROUGHPUT_RPS).getOrElse(0) val goodRps = ConfigUtils.getOptionInt(conf, GOOD_THROUGHPUT_RPS).getOrElse(0) val dryRun = ConfigUtils.getOptionBoolean(conf, DRY_RUN).getOrElse(false) @@ -132,6 +133,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification PipelineInfo( pipelineName, + pipelineDefinitionId, environmentName, RuntimeInfo( runtimeConfig.runDate, @@ -264,7 +266,10 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification sendPipelineNotifications() runCustomShutdownHook() removeSignalHandlers() - addJournalEntry() + Try(addJournalEntry()).recover { + case NonFatal(ex) => + log.error("Unable to write pipeline execution journal entry.", ex) + } sendNotificationEmail() TokenLockRegistry.releaseAllLocks() } @@ -348,6 +353,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification journalOpt.foreach { journal => val execution = Execution( pipelineId, + pipelineInfo.pipelineDefinitionId, pipelineName, environmentName, batchId, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index 08b9d96d3..4726696d3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -934,6 +934,15 @@ object SparkUtils { } } + /** + * Calculates the total number of columns in a given schema, including all nested columns + * within struct types. Array types are traversed to count any nested struct columns within + * their element types, but the array itself is not counted as an additional column beyond + * its top-level entry. + * + * @param schema the StructType representing the schema whose columns are to be counted + * @return the total number of columns, including all columns found in nested struct types + */ def getTotalNumberOfColumns(schema: StructType): Int = { def countNestedColumns(dataType: DataType): Int = { dataType match { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala deleted file mode 100644 index 2aada79c3..000000000 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2022 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.script - -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.slf4j.LoggerFactory - -import java.io.ByteArrayOutputStream - -/** - * This is the Glue job to use for running Pramen on Glue. - * - * See documentation of AqueductPramenJob in 'pramen-components' module for the info on usage. - */ - -object MyApp { - private val log = LoggerFactory.getLogger(this.getClass) - - def showString(df: DataFrame, numRows: Int = 20): String = { - val outCapture = new ByteArrayOutputStream - Console.withOut(outCapture) { - df.show(numRows, truncate = false) - } - new String(outCapture.toByteArray).replace("\r\n", "\n") - } - - // spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions - // --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog - // --conf spark.sql.catalog.glue_catalog.warehouse=s3://aqueduct-pipelines-dev-application-data/iceberg-warehouse - // --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog - // --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO - // --conf spark.sql.defaultCatalog=glue_catalog - def main(args: Array[String]): Unit = { - val spark: SparkSession = SparkSession.builder() - .appName("test") - .config("spark.sql.parquet.writeLegacyFormat", "true") - .config("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") - .config("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY") - .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.sql.catalog.glue_catalog.warehouse", "s3://aqueduct-pipelines-uat-application-data/iceberg-warehouse") - .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") - .config("spark.sql.catalog.glue_catalog.io", "org.apache.iceberg.aws.s3.S3FileIO") - .config("spark.sql.catalog.glue_catalog.s3.access-points.ursamajor-afs1-dev-edla-aqdt-za", "arn:aws:s3:af-south-1:131405913869:accesspoint/aqdt-za-npintdeaqueduct") - .config("spark.sql.defaultCatalog", "glue_catalog") - .getOrCreate() - - //log.error(showString(spark.catalog.listTables("edla_dev_aqdt_za_publish_rl").toDF())) - //log.error(showString(spark.table("glue_catalog.edla_dev_aqdt_za_publish_rl.aq_journal_iceberg4"))) - - //spark.sql( - // """ - // | ALTER TABLE glue_catalog.edla_dev_aqdt_za_publish_rl.aq_journal_iceberg4 CREATE TAG my_tag AS OF VERSION 7314587597447473054 - // |""".stripMargin).collect() - - val df1 = spark.sql("SELECT * FROM glue_catalog.edla_dev_aqdt_za_publish_rl.aq_journal_iceberg4.refs") - log.error(showString(df1)) - } -} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala index 206e2f723..29aaa643a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala @@ -23,6 +23,7 @@ import java.time.{Instant, LocalDate} object PipelineInfoFactory { def getDummyPipelineInfo(pipelineName: String = "Dummy Pipeline", + pipelineDefinitionId: String = "", environment: String = "DEV", runtimeInfo: RuntimeInfo = RuntimeInfo(LocalDate.parse("2022-02-18")), startedAt: Instant = Instant.ofEpochSecond(1718609409), @@ -36,6 +37,6 @@ object PipelineInfoFactory { tenant: Option[String] = Some("Dummy tenant"), country: Option[String] = Some("noname"), batchId: Long = 123L): PipelineInfo = { - PipelineInfo(pipelineName, environment, runtimeInfo, startedAt, finishedAt, warningFlag, sparkApplicationId, pipelineStatus, failureException, pipelineNotificationFailures, pipelineId, tenant, country, batchId) + PipelineInfo(pipelineName, pipelineDefinitionId, environment, runtimeInfo, startedAt, finishedAt, warningFlag, sparkApplicationId, pipelineStatus, failureException, pipelineNotificationFailures, pipelineId, tenant, country, batchId) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala index 22a23bb33..4f06ed983 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala @@ -1146,4 +1146,128 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture } } + "getTotalNumberOfColumns" should { + "return the number of columns for a flat schema" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("name", StringType), + StructField("age", IntegerType) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 3) + } + + "return the total number of columns including nested struct fields" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("address", StructType(Array( + StructField("street", StringType), + StructField("city", StringType), + StructField("zip", StringType) + ))) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 5) + } + + "return the total number of columns including array of structs" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("phones", ArrayType(StructType(Array( + StructField("type", StringType), + StructField("number", StringType) + )))) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 4) + } + + "return the total number of columns for a deeply nested schema" in { + val schema = StructType(Array( + StructField("id", LongType), + StructField("level1", StructType(Array( + StructField("field1", StringType), + StructField("level2", StructType(Array( + StructField("field2", IntegerType), + StructField("level3", ArrayType(StructType(Array( + StructField("field3", StringType), + StructField("field4", DoubleType) + )))) + ))) + ))) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 8) + } + + "return 0 for an empty schema" in { + val schema = StructType(Array.empty[StructField]) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 0) + } + + "count array of primitives as a single column" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("tags", ArrayType(StringType)), + StructField("name", StringType) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 3) + } + + "handle multiple nested structs at the same level" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("struct1", StructType(Array( + StructField("a", StringType), + StructField("b", StringType) + ))), + StructField("struct2", StructType(Array( + StructField("c", IntegerType), + StructField("d", IntegerType), + StructField("e", IntegerType) + ))) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 8) + } + + "handle the test case schema from NestedDataFrameFactory" in { + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], NestedDataFrameFactory.testCaseSchema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 29) + } + } + } diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala index 1809bf209..ced98cc1c 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala @@ -62,7 +62,7 @@ class EcsPipelineNotificationTargetSuite extends AnyWordSpec { val task3 = TestPrototypes.taskNotification.copy(taskDef = taskDef3) notificationTarget.sendNotification( - PipelineInfo("Dummy", "DEV", RuntimeInfo(LocalDate.parse("2022-02-18")), Instant.now, None, warningFlag = false, None, PipelineStatus.Success, None, Seq.empty, "pid_123", None, None, 123L), + PipelineInfo("Dummy", "", "DEV", RuntimeInfo(LocalDate.parse("2022-02-18")), Instant.now, None, warningFlag = false, None, PipelineStatus.Success, None, Seq.empty, "pid_123", None, None, 123L), Seq(task1, task2, task3), CustomNotification(Seq.empty, Seq.empty) )