From c2342fcbddd20cad01c45bb0d137299f74e2f4a0 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Tue, 24 Feb 2026 18:58:31 +0100 Subject: [PATCH 01/19] temp workaround for dupl_hash --- api/models/Datapoint.js | 6 ++++-- api/services/service.js | 7 +++++-- config.template.js | 1 + 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/api/models/Datapoint.js b/api/models/Datapoint.js index 4cb547e..706293c 100644 --- a/api/models/Datapoint.js +++ b/api/models/Datapoint.js @@ -1,5 +1,6 @@ const mongoose = require("mongoose"); const crypto = require("crypto"); +const config = require("../../config") // Funzione di utilità per pulire il nome della survey const cleanSurveyName = (val) => { @@ -17,7 +18,7 @@ const datapointSchema = new mongoose.Schema( timestamp: { type: String, index: true }, dimensions: { type: Object }, value: Number, - dupl_hash: { type: String }, // Identificatore unico per evitare duplicati + dupl_hash: { type: String, unique : config.upsertRecords == true }, // Identificatore unico per evitare duplicati }, { strict: false, @@ -37,11 +38,12 @@ datapointSchema.pre("save", function (next) { // Funzione per generare l'hash unico (usata nell'inserimento massivo) const generateHash = (doc) => { + return JSON.stringify(doc) // Gestiamo sia se arriva come documento Mongoose sia come oggetto puro const target = doc._doc || doc; const dims = target.dimensions || []; - const sortedDims = [...dims].sort(); + const sortedDims = [...dims].sort();//FIXME funziona solo con gli array, ci vorrebbe Object.entries() const sortedKeys = sortedDims.join("|"); const s = cleanSurveyName(target.survey); diff --git a/api/services/service.js b/api/services/service.js index 5b32f16..2e30525 100644 --- a/api/services/service.js +++ b/api/services/service.js @@ -140,7 +140,7 @@ module.exports = { } }) if (response.data[0]) { - if (!purged) + if (!purged && !config.upsertRecords) await Datapoints.deleteMany({ survey: response.data[0].survey }); // Cancellazione preliminare const dataToInsert = response.data.map((d) => { // Preparazione dei dati return { @@ -148,7 +148,10 @@ module.exports = { fromUrl: urlValue, }; }); - await Datapoints.upsertMany(dataToInsert); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion + if(config.upsertRecords) + await Datapoints.upsertMany(dataToInsert); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion + else + await Datapoints.insertMany(dataToInsert) lastId = response.data[response.data.length - 1]?._id // Gestione indici per il prossimo loop purged = true; } diff --git a/config.template.js b/config.template.js index db78b82..60d289d 100644 --- a/config.template.js +++ b/config.template.js @@ -38,6 +38,7 @@ module.exports = { logLevel: "info", syncInterval: 86400000, doNotSyncAtStart: false, + upsertRecords: true, delays: 1, queryAllowedExtensions: ["csv", "json", "geojson"], parseCompatibilityMode: 0, From fe8a446d9c2d924617eb7e1c5db1f1806f1db6a7 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Wed, 25 Feb 2026 19:26:43 +0100 Subject: [PATCH 02/19] faster query --- api/models/Dimensions.js | 6 ++++++ api/services/service.js | 33 ++++++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 3 deletions(-) create mode 100644 api/models/Dimensions.js diff --git a/api/models/Dimensions.js b/api/models/Dimensions.js new file mode 100644 index 0000000..3c3cd60 --- /dev/null +++ b/api/models/Dimensions.js @@ -0,0 +1,6 @@ +const mongoose = require("mongoose"); + +const dimensions + = new mongoose.Schema({}, { strict: false, versionKey: false }); + +module.exports = mongoose.model("dimensions", dimensions); \ No newline at end of file diff --git a/api/services/service.js b/api/services/service.js index 2e30525..48281ce 100644 --- a/api/services/service.js +++ b/api/services/service.js @@ -1,6 +1,7 @@ const logger = require("percocologger"); const log = logger.info; const Datapoints = require("../models/Datapoint"); +const Dimensions = require("../models/Dimensions"); const config = require("../../config"); const minioWriter = require("../../inputConnectors/minioConnector"); const axios = require("axios"); @@ -140,7 +141,7 @@ module.exports = { } }) if (response.data[0]) { - if (!purged && !config.upsertRecords) + if (!purged && !config.upsertRecords) await Datapoints.deleteMany({ survey: response.data[0].survey }); // Cancellazione preliminare const dataToInsert = response.data.map((d) => { // Preparazione dei dati return { @@ -148,12 +149,38 @@ module.exports = { fromUrl: urlValue, }; }); - if(config.upsertRecords) + if (config.upsertRecords) await Datapoints.upsertMany(dataToInsert); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion - else + else await Datapoints.insertMany(dataToInsert) lastId = response.data[response.data.length - 1]?._id // Gestione indici per il prossimo loop purged = true; + const surveyKey = dataToInsert[0].survey.toUpperCase().replace(/\./g, ""); + const dimensionsFound = await Dimensions.findOne({ survey: surveyKey }); // direttamente un singolo documento + const uniqueKeys = new Set(); + for (const obj of dataToInsert) { + for (const key in obj.dimensions) { + uniqueKeys.add(key); + } + } + if (dimensionsFound) { + for (const key in dimensionsFound.dimensions) { + uniqueKeys.add(key); + } + } + let dimensionsObject = {} + for (let key of Array.from(uniqueKeys)) + dimensionsObject[key] = true + const dimensionObject = { + dimensions: dimensionsObject, + survey: surveyKey + }; + await Dimensions.findOneAndUpdate( + { survey: surveyKey }, + dimensionObject, + { upsert: true, new: true } + ); + logger.debug("Dimension object saved/updated:", dimensionObject); } } } catch (error) { From b16439d8ebc384c1338b9e75ed73b9020c4b13f0 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Thu, 12 Mar 2026 17:19:32 +0100 Subject: [PATCH 03/19] verify lost subscriptions and minimal view for empty cache added --- api/models/Entity.js | 5 +++++ api/services/service.js | 12 +++++++++- config.template.js | 4 +++- index.js | 9 +++++++- inputConnectors/apiConnector.js | 2 +- utils/common.js | 39 ++++++++++++++++++++++++++------- 6 files changed, 59 insertions(+), 12 deletions(-) create mode 100644 api/models/Entity.js diff --git a/api/models/Entity.js b/api/models/Entity.js new file mode 100644 index 0000000..09e7ca9 --- /dev/null +++ b/api/models/Entity.js @@ -0,0 +1,5 @@ +const mongoose = require("mongoose"); + +const entity = new mongoose.Schema({}, { strict: false, versionKey: false }) + +module.exports = mongoose.model("entities", entity); \ No newline at end of file diff --git a/api/services/service.js b/api/services/service.js index 48281ce..c5039dc 100644 --- a/api/services/service.js +++ b/api/services/service.js @@ -8,6 +8,7 @@ const axios = require("axios"); const fs = require("fs"); const { updateJWT } = require("../../utils/keycloak"); let bearerToken; +const Entity = require("../models/Entity") updateJWT() .then((token) => { bearerToken = token; @@ -28,6 +29,15 @@ module.exports = { for (const ent of entities) { const id = ent.id || ent["@id"] || "unknown-id"; + const modifiedDate = ent.modifiedDate.value["@value"] + let existingEntity = await Entity.findOne({ id }) + if (existingEntity) + if (existingEntity.modifiedDate["@value"] != modifiedDate) + await Entity.findOneAndUpdate(ent) + else + return "Ok" + else + await Entity.insertMany([ent]) let urlValue; if ( ent[attrWithUrl] && @@ -178,7 +188,7 @@ module.exports = { await Dimensions.findOneAndUpdate( { survey: surveyKey }, dimensionObject, - { upsert: true, new: true } + { upsert: true, new: true } ); logger.debug("Dimension object saved/updated:", dimensionObject); } diff --git a/config.template.js b/config.template.js index 60d289d..63e5201 100644 --- a/config.template.js +++ b/config.template.js @@ -31,9 +31,11 @@ module.exports = { deleteAllDuplicateSubscriptions: true, attrWithUrl: "datasetUrl", orionBaseUrl: "http://localhost:1027", + ngsiBrokerBaseUrl: "https://dx-lab.it/", notificationUrl: "http://localhost:3000/api/orion/subscribe", fiwareService: "", - fiwareServicePath: "" + fiwareServicePath: "", + checkSubscriptionInterval: 0 }, logLevel: "info", syncInterval: 86400000, diff --git a/index.js b/index.js index 95122c1..09c45a3 100644 --- a/index.js +++ b/index.js @@ -15,6 +15,13 @@ mongoose.connect(config.mongo, { useNewUrlParser: true, useUnifiedTopology: true app.use(express.urlencoded({ extended: false })); app.use(bodyParser.json()); app.use(config.basePath || "/api", routes); - app.listen(port, () => { logger.info(`Source connector server listens on http://localhost:${port}`); }); + app.listen(port, () => { + logger.info(`Source connector server listens on http://localhost:${port}`); + if (config.orion.checkSubscriptionInterval) + setInterval(common.verifyLostSubscription, config.checkSubscriptionInterval) + common.verifyLostSubscription().then(() => { + logger.info("lost subscription verified") + }) + }); logger.info(`Node.js version: ${process.version}`); }) diff --git a/inputConnectors/apiConnector.js b/inputConnectors/apiConnector.js index c773c06..22af779 100644 --- a/inputConnectors/apiConnector.js +++ b/inputConnectors/apiConnector.js @@ -146,4 +146,4 @@ async function checkMultipleSubscriptions(notificationUrl) { return count; } -module.exports = { createOrionSubscription }; \ No newline at end of file +module.exports = { createOrionSubscription, getEndpointVersionApi }; \ No newline at end of file diff --git a/utils/common.js b/utils/common.js index 271b254..a9b69d6 100644 --- a/utils/common.js +++ b/utils/common.js @@ -1,5 +1,8 @@ const logger = require('percocologger') const log = logger.info +const axios = require("axios") +const config = require("../config") +const Entity = require("../api/models/Entity") function objectCheck(objs) { for (let obj of objs) @@ -40,7 +43,7 @@ function convertCSVtoJSON(csvData) { obj[this.deleteSpaces(headers[j].replaceAll(/['"]/g, ''))] = this.deleteSpaces(currentLine[j]?.replaceAll(/['"]/g, '')); results.push(obj); } - + return JSON.stringify(results); } @@ -64,6 +67,26 @@ function syncEntries(obj, visibility, entries) { module.exports = { + async verifyLostSubscription() { + //la seguente riga non funzionerà . Probabilmente bisogna chiamare l'ngsi broker + //let entities = await axios.get(config.orion.orionBaseUrl + apiConnector.getEndpointVersionApi().split("subscriptions") + "/entities?type=DistributionDCAT-AP") + let entities = await axios.get(config.orion.ngsiBrokerBaseUrl + "/api/distributiondcatap") + for (let ent of entities) { + const existingEntity = await Entity.findOne(ent.id) + if (existingEntity && existingEntity.modifiedDate.value["@value"] != ent.modifiedDate["@value"]) + await axios.post("http://localhost:" + config.port || 3000 + "/api/orion/subscribe/6914a252ddb96948ee67b2e1", { + "id": "self", + "type": "Notification", + "subscriptionId": "self", + "notifiedAt": Date.now(), + "data": [ + ent + ] + }) + } + logger.info("Lost subscription verified") + }, + sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); }, @@ -83,7 +106,7 @@ module.exports = { }, async getEntries(obj, type, name, entries) {// csv, jsonArray, json - + let visibility = getVisibility(name) if (!obj[0].csv && Array.isArray(obj[0].json) && type != "jsonArray") type = "jsonArray" //throw new Error("obj is a jsonArray and not " + type) @@ -98,10 +121,10 @@ module.exports = { else { logger.trace(obj[0]) syncEntries(obj[0], visibility, entries) - + return } - + logger.trace("so it was a geojson") } logger.trace("Here's obj before flatmap") @@ -111,7 +134,7 @@ module.exports = { obj = obj.map(o => o.properties) for (let o of obj) syncEntries(o, visibility, entries) - + return }, @@ -156,7 +179,7 @@ module.exports = { ] const headers = possibleHeaders[0].length > possibleHeaders[1].length ? possibleHeaders[0] : possibleHeaders[1] const results = []; - + for (let i = 1; i < lines.length; i++) { const obj = {}; const currentLine = lines[i].trim().split(possibleHeaders[0].length > possibleHeaders[1].length ? "," : ";"); @@ -164,7 +187,7 @@ module.exports = { obj[this.deleteSpaces(headers[j].replaceAll(/['"]/g, ''))] = this.deleteSpaces(currentLine[j]?.replaceAll(/['"]/g, '')); results.push(obj); } - + return JSON.stringify(results); }, @@ -174,7 +197,7 @@ module.exports = { checkConfig(configIn, configTemplate) { for (let key in configTemplate) { - if (typeof configIn[key] == "object") + if (typeof configIn[key] == "object") configIn[key] = this.checkConfig(configIn[key], configTemplate[key]) else if (configIn[key] == undefined) { logger.warn(`Config key ${key} is missing, using default value`) From 7f223e3bae301bd1bfd7fb6a1cbd65c6652dda24 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Thu, 12 Mar 2026 17:44:34 +0100 Subject: [PATCH 04/19] FIFO --- api/controllers/controller.js | 8 +- api/services/service.js | 387 ++++++++++++++++++---------------- 2 files changed, 208 insertions(+), 187 deletions(-) diff --git a/api/controllers/controller.js b/api/controllers/controller.js index e43c4b7..a448ceb 100644 --- a/api/controllers/controller.js +++ b/api/controllers/controller.js @@ -3,7 +3,7 @@ const logger = require('percocologger') module.exports = { - + sync: async (req, res) => { logger.info("Sync") @@ -13,7 +13,11 @@ module.exports = { notifyPath: async (req, res) => { logger.info("Notification received") try { - res.send(await service.notifyPath(req, res)) + let response = await service.notifyPath(req, res) + if (typeof response == string) + return res.send(response) + else + res.status(500).send(response.toString() == "[object Object]" ? response : response.toString()) } catch (error) { logger.error(error) diff --git a/api/services/service.js b/api/services/service.js index c5039dc..c7fb6bb 100644 --- a/api/services/service.js +++ b/api/services/service.js @@ -9,6 +9,7 @@ const fs = require("fs"); const { updateJWT } = require("../../utils/keycloak"); let bearerToken; const Entity = require("../models/Entity") +const { sleep } = require("../../utils/common") updateJWT() .then((token) => { bearerToken = token; @@ -20,213 +21,229 @@ const path = require("path"); let attrWithUrl = config.orion?.attrWithUrl || "datasetUrl"; require("../../inputConnectors/apiConnector"); -module.exports = { - notifyPath: async (req, res) => { - logger.info({ body: JSON.stringify(req.body) }); +let requestStack = [] +async function executeRequest(req, res) { + logger.info({ body: JSON.stringify(req.body) }); - const data = req.body.data || req.body.value || req.body; - const entities = Array.isArray(data) ? data : [data]; + const data = req.body.data || req.body.value || req.body; + const entities = Array.isArray(data) ? data : [data]; - for (const ent of entities) { - const id = ent.id || ent["@id"] || "unknown-id"; - const modifiedDate = ent.modifiedDate.value["@value"] - let existingEntity = await Entity.findOne({ id }) - if (existingEntity) - if (existingEntity.modifiedDate["@value"] != modifiedDate) - await Entity.findOneAndUpdate(ent) - else - return "Ok" + for (const ent of entities) { + const id = ent.id || ent["@id"] || "unknown-id"; + const modifiedDate = ent.modifiedDate.value["@value"] + let existingEntity = await Entity.findOne({ id }) + if (existingEntity) + if (existingEntity.modifiedDate["@value"] != modifiedDate) + await Entity.findOneAndUpdate(ent) else - await Entity.insertMany([ent]) - let urlValue; - if ( - ent[attrWithUrl] && - typeof ent[attrWithUrl] === "object" && - "value" in ent[attrWithUrl] - ) { - urlValue = ent[attrWithUrl].value; - } else if (ent[attrWithUrl]) { - urlValue = ent[attrWithUrl]; - } else if (ent[attrWithUrl + ":value"]) { - urlValue = ent[attrWithUrl + ":value"]; - } else if (ent.value) { - urlValue = ent.value; - } + return "Ok" + else + await Entity.insertMany([ent]) + let urlValue; + if ( + ent[attrWithUrl] && + typeof ent[attrWithUrl] === "object" && + "value" in ent[attrWithUrl] + ) { + urlValue = ent[attrWithUrl].value; + } else if (ent[attrWithUrl]) { + urlValue = ent[attrWithUrl]; + } else if (ent[attrWithUrl + ":value"]) { + urlValue = ent[attrWithUrl + ":value"]; + } else if (ent.value) { + urlValue = ent.value; + } - if (!urlValue || typeof urlValue !== "string") { - console.warn(`no URL found for entity ${id}`); - continue; - } + if (!urlValue || typeof urlValue !== "string") { + console.warn(`no URL found for entity ${id}`); + continue; + } - let mapID = - req.query.mapID || req.params.mapID || ent.mapID || config.mapID; + let mapID = + req.query.mapID || req.params.mapID || ent.mapID || config.mapID; - if (!mapID) { - const response = await axios.get(urlValue); - if (response?.data?.data?.datapoints) - await Datapoints.insertMany(response.data.data.datapoints); - else - await minioWriter.insertInDBs(response.data, { - name: id + "-" + path.basename(new URL(urlValue).pathname), - lastModified: new Date(), - versionId: "null", - isDeleteMarker: false, - bucketName: "orion-notify", - size: response.data.length, - isLatest: true, - etag: "", - insertedBy: "orion-notify", - }); - } else { - let response; - let retry = 2; - while (retry > 0) - try { - response = await axios.post( - config.mapEndpoint, - { - sourceDataType: "json", - sourceDataURL: urlValue, - decodeOptions: { - decodeFrom: "json-stat", - }, - config: { - NGSI_entity: false, - ignoreValidation: true, - writers: [], - disableAjv: true, - mappingReport: true, - }, - dataModel: { - $schema: "http://json-schema.org/schema#", - $id: "dataModels/DataModelTemp.json", - title: "DataModelTemp", - description: "Bike Hire Docking Station", - type: "object", - properties: { - region: { - type: "string", - }, - source: { - type: "string", - }, - timestamp: { - type: "string", - }, - survey: { - type: "string", - }, - dimensions: { - type: "object", - }, - value: { - type: "integer", - }, + if (!mapID) { + const response = await axios.get(urlValue); + if (response?.data?.data?.datapoints) + await Datapoints.insertMany(response.data.data.datapoints); + else + await minioWriter.insertInDBs(response.data, { + name: id + "-" + path.basename(new URL(urlValue).pathname), + lastModified: new Date(), + versionId: "null", + isDeleteMarker: false, + bucketName: "orion-notify", + size: response.data.length, + isLatest: true, + etag: "", + insertedBy: "orion-notify", + }); + } else { + let response; + let retry = 2; + while (retry > 0) + try { + response = await axios.post( + config.mapEndpoint, + { + sourceDataType: "json", + sourceDataURL: urlValue, + decodeOptions: { + decodeFrom: "json-stat", + }, + config: { + NGSI_entity: false, + ignoreValidation: true, + writers: [], + disableAjv: true, + mappingReport: true, + }, + dataModel: { + $schema: "http://json-schema.org/schema#", + $id: "dataModels/DataModelTemp.json", + title: "DataModelTemp", + description: "Bike Hire Docking Station", + type: "object", + properties: { + region: { + type: "string", + }, + source: { + type: "string", + }, + timestamp: { + type: "string", + }, + survey: { + type: "string", + }, + dimensions: { + type: "object", + }, + value: { + type: "integer", }, }, - } /* + }, + } /* { //mapID, sourceDataURL: urlValue }*/, - { + { + headers: { + Authorization: `Bearer ${bearerToken}`, + }, + } + ); + retry -= 2; + try { + logger.info("Inserting datapoints into DB..."); + logger.info(response.data); + let outputId = response.data[response.data.length - 1].MAPPING_REPORT.outputId + let lastId + let purged = false + for (let chunkIndex = 0; (response.data[0] || response.data.id); chunkIndex++) { // Loop per gestire i chunk + //while (response.data[0] || response.data.id) { + logger.info(response.data.status) + response = await axios.get((config.sessionEndpoint || "http://localhost:5500/api/output?") + "id=" + outputId + "&lastId=" + lastId + "&index=" + chunkIndex, { headers: { - Authorization: `Bearer ${bearerToken}`, - }, - } - ); - retry -= 2; - try { - logger.info("Inserting datapoints into DB..."); - logger.info(response.data); - let outputId = response.data[response.data.length - 1].MAPPING_REPORT.outputId - let lastId - let purged = false - for (let chunkIndex = 0; (response.data[0] || response.data.id); chunkIndex++) { // Loop per gestire i chunk - //while (response.data[0] || response.data.id) { - logger.info(response.data.status) - response = await axios.get((config.sessionEndpoint || "http://localhost:5500/api/output?") + "id=" + outputId + "&lastId=" + lastId + "&index=" + chunkIndex, { - headers: { - Authorization: `Bearer ${bearerToken}` - } - }) - if (response.data[0]) { - if (!purged && !config.upsertRecords) - await Datapoints.deleteMany({ survey: response.data[0].survey }); // Cancellazione preliminare - const dataToInsert = response.data.map((d) => { // Preparazione dei dati - return { - ...d, - fromUrl: urlValue, - }; - }); - if (config.upsertRecords) - await Datapoints.upsertMany(dataToInsert); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion - else - await Datapoints.insertMany(dataToInsert) - lastId = response.data[response.data.length - 1]?._id // Gestione indici per il prossimo loop - purged = true; - const surveyKey = dataToInsert[0].survey.toUpperCase().replace(/\./g, ""); - const dimensionsFound = await Dimensions.findOne({ survey: surveyKey }); // direttamente un singolo documento - const uniqueKeys = new Set(); - for (const obj of dataToInsert) { - for (const key in obj.dimensions) { - uniqueKeys.add(key); - } + Authorization: `Bearer ${bearerToken}` + } + }) + if (response.data[0]) { + if (!purged && !config.upsertRecords) + await Datapoints.deleteMany({ survey: response.data[0].survey }); // Cancellazione preliminare + const dataToInsert = response.data.map((d) => { // Preparazione dei dati + return { + ...d, + fromUrl: urlValue, + }; + }); + if (config.upsertRecords) + await Datapoints.upsertMany(dataToInsert); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion + else + await Datapoints.insertMany(dataToInsert) + lastId = response.data[response.data.length - 1]?._id // Gestione indici per il prossimo loop + purged = true; + const surveyKey = dataToInsert[0].survey.toUpperCase().replace(/\./g, ""); + const dimensionsFound = await Dimensions.findOne({ survey: surveyKey }); // direttamente un singolo documento + const uniqueKeys = new Set(); + for (const obj of dataToInsert) { + for (const key in obj.dimensions) { + uniqueKeys.add(key); } - if (dimensionsFound) { - for (const key in dimensionsFound.dimensions) { - uniqueKeys.add(key); - } + } + if (dimensionsFound) { + for (const key in dimensionsFound.dimensions) { + uniqueKeys.add(key); } - let dimensionsObject = {} - for (let key of Array.from(uniqueKeys)) - dimensionsObject[key] = true - const dimensionObject = { - dimensions: dimensionsObject, - survey: surveyKey - }; - await Dimensions.findOneAndUpdate( - { survey: surveyKey }, - dimensionObject, - { upsert: true, new: true } - ); - logger.debug("Dimension object saved/updated:", dimensionObject); } + let dimensionsObject = {} + for (let key of Array.from(uniqueKeys)) + dimensionsObject[key] = true + const dimensionObject = { + dimensions: dimensionsObject, + survey: surveyKey + }; + await Dimensions.findOneAndUpdate( + { survey: surveyKey }, + dimensionObject, + { upsert: true, new: true } + ); + logger.debug("Dimension object saved/updated:", dimensionObject); } - } catch (error) { - logger.error("Error inserting datapoints:", error); } } catch (error) { - logger.error( - "Error fetching mapped data from API Connector:", - error.response?.data || error.message - ); - try { - bearerToken = await updateJWT(true); - retry--; - } catch (e) { - logger.error("Error updating JWT:", e); - retry--; - } + logger.error("Error inserting datapoints:", error); } - //logger.info(response.data.lenght) + } catch (error) { + logger.error( + "Error fetching mapped data from API Connector:", + error.response?.data || error.message + ); + try { + bearerToken = await updateJWT(true); + retry--; + } catch (e) { + logger.error("Error updating JWT:", e); + retry--; + } + } + //logger.info(response.data.lenght) + + /*for (let i in response.data) + await minioWriter.insertInDBs(response.data[i], { + name: response.data[i].id || mapID + '-' + path.basename((new URL(urlValue)).pathname) + i, + lastModified: new Date(), + versionId: 'null', + isDeleteMarker: false, + bucketName: 'orion-notify', + size: response.data.length, + isLatest: true, + etag: '', + insertedBy: 'orion-notify' + });*/ + } + logger.info(`downloaded ${urlValue}`); + } + return "OK"; +} - /*for (let i in response.data) - await minioWriter.insertInDBs(response.data[i], { - name: response.data[i].id || mapID + '-' + path.basename((new URL(urlValue)).pathname) + i, - lastModified: new Date(), - versionId: 'null', - isDeleteMarker: false, - bucketName: 'orion-notify', - size: response.data.length, - isLatest: true, - etag: '', - insertedBy: 'orion-notify' - });*/ - } - logger.info(`downloaded ${urlValue}`); +module.exports = { + notifyPath: async (req, res) => { + let turn = requestStack.length + requestStack.push([req, res]) + while (turn && requestStack.length > turn) + await sleep(100) + try { + await executeRequest(...requestStack[0]) + } + catch (error) { + logger.error(error) + return error } - return "OK"; + requestStack.shift() }, sync: minioWriter.sync, From a3b3eb2d164bf20a393655c5260847f1f839ac5b Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:31:40 +0100 Subject: [PATCH 05/19] build docker image --- .github/workflows/build-backend.yml | 37 +++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 .github/workflows/build-backend.yml diff --git a/.github/workflows/build-backend.yml b/.github/workflows/build-backend.yml new file mode 100644 index 0000000..1a5819a --- /dev/null +++ b/.github/workflows/build-backend.yml @@ -0,0 +1,37 @@ +name: Build Backend + +on: + push: + branches: + - main + - build + pull_request: + branches: + - main + - build + +jobs: + + build-backend: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Log in to Docker Hub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: ./Dockerfile + push: true + tags: ${{ secrets.DOCKER_USERNAME }}/source_connector:latest From 74917cab8cf19617a72cefa4835fdac34da954ab Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:49:37 +0100 Subject: [PATCH 06/19] handle errors and manual sync --- api/services/service.js | 9 +++++++-- utils/common.js | 37 +++++++++++++++++++++---------------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/api/services/service.js b/api/services/service.js index c7fb6bb..524a4f8 100644 --- a/api/services/service.js +++ b/api/services/service.js @@ -9,7 +9,7 @@ const fs = require("fs"); const { updateJWT } = require("../../utils/keycloak"); let bearerToken; const Entity = require("../models/Entity") -const { sleep } = require("../../utils/common") +const { sleep, verifyLostSubscription } = require("../../utils/common") updateJWT() .then((token) => { bearerToken = token; @@ -246,5 +246,10 @@ module.exports = { requestStack.shift() }, - sync: minioWriter.sync, + sync() { + if (config.sourceConnectors.minioConnector) + minioWriter.sync() + if (config.sourceConnectors.apiConnector) + verifyLostSubscription() + }, }; diff --git a/utils/common.js b/utils/common.js index a9b69d6..2c089e4 100644 --- a/utils/common.js +++ b/utils/common.js @@ -68,23 +68,28 @@ function syncEntries(obj, visibility, entries) { module.exports = { async verifyLostSubscription() { - //la seguente riga non funzionerà . Probabilmente bisogna chiamare l'ngsi broker - //let entities = await axios.get(config.orion.orionBaseUrl + apiConnector.getEndpointVersionApi().split("subscriptions") + "/entities?type=DistributionDCAT-AP") - let entities = await axios.get(config.orion.ngsiBrokerBaseUrl + "/api/distributiondcatap") - for (let ent of entities) { - const existingEntity = await Entity.findOne(ent.id) - if (existingEntity && existingEntity.modifiedDate.value["@value"] != ent.modifiedDate["@value"]) - await axios.post("http://localhost:" + config.port || 3000 + "/api/orion/subscribe/6914a252ddb96948ee67b2e1", { - "id": "self", - "type": "Notification", - "subscriptionId": "self", - "notifiedAt": Date.now(), - "data": [ - ent - ] - }) + try { + //la seguente riga non funzionerà . Probabilmente bisogna chiamare l'ngsi broker + //let entities = await axios.get(config.orion.orionBaseUrl + apiConnector.getEndpointVersionApi().split("subscriptions") + "/entities?type=DistributionDCAT-AP") + let entities = await axios.get(config.orion.ngsiBrokerBaseUrl + "/api/distributiondcatap") + for (let ent of entities) { + const existingEntity = await Entity.findOne(ent.id) + if (existingEntity && existingEntity.modifiedDate.value["@value"] != ent.modifiedDate["@value"]) + await axios.post("http://localhost:" + config.port || 3000 + "/api/orion/subscribe/6914a252ddb96948ee67b2e1", { + "id": "self", + "type": "Notification", + "subscriptionId": "self", + "notifiedAt": Date.now(), + "data": [ + ent + ] + }) + } + logger.info("Lost subscription verified") + } + catch (error) { + logger.error(error) } - logger.info("Lost subscription verified") }, sleep(ms) { From 033a5f3773f210a91d894606245d1699e5a6c16f Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:55:53 +0100 Subject: [PATCH 07/19] update dockerfile --- Dockerfile | 1 + token.template.js | 1 + utils/common.js | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 token.template.js diff --git a/Dockerfile b/Dockerfile index 591f513..d30d229 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,4 +3,5 @@ WORKDIR /app COPY . . #COPY ../utils ../utils RUN npm install +COPY ./token.template.js ./token.js CMD ["node", "index"] \ No newline at end of file diff --git a/token.template.js b/token.template.js new file mode 100644 index 0000000..6ccd301 --- /dev/null +++ b/token.template.js @@ -0,0 +1 @@ +module.exports = "" \ No newline at end of file diff --git a/utils/common.js b/utils/common.js index 2c089e4..72022b4 100644 --- a/utils/common.js +++ b/utils/common.js @@ -88,7 +88,7 @@ module.exports = { logger.info("Lost subscription verified") } catch (error) { - logger.error(error) + logger.error(error.response.message, error.respose.status) } }, From 00f0f30d2484d8a862874aa2032e105aa3c82095 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:59:27 +0100 Subject: [PATCH 08/19] fix bug with axios error --- utils/common.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/common.js b/utils/common.js index 72022b4..67d33fa 100644 --- a/utils/common.js +++ b/utils/common.js @@ -88,7 +88,7 @@ module.exports = { logger.info("Lost subscription verified") } catch (error) { - logger.error(error.response.message, error.respose.status) + logger.error(error.response?.message, error.response?.status, error.response?.statusText, error.response?.data) } }, From b3f9a476449acffb982c3df302a6059add85db64 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Thu, 12 Mar 2026 20:07:20 +0100 Subject: [PATCH 09/19] better error handling --- inputConnectors/apiConnector.js | 24 +++++++++++++++--------- utils/common.js | 7 ++++++- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/inputConnectors/apiConnector.js b/inputConnectors/apiConnector.js index 22af779..1ab9632 100644 --- a/inputConnectors/apiConnector.js +++ b/inputConnectors/apiConnector.js @@ -67,7 +67,13 @@ if (config.orion.subscribe) if (sub != "Already existing subscription found for the same notification URL.") logger.info("Orion subscription created: " + sub) }).catch(err => { - logger.error("Error creating Orion subscription: ", err.response?.data || err.message || err) + logger.error("Error creating Orion subscription: ")//, err.response?.data || err.message || err) + logger.error({ + config : err.config, + status: err.response?.status, + ststusText: err.response?.statusText, + data: err.response?.data + }) err.response?.config?.data && logger.error(err.response?.config?.data) }) @@ -88,16 +94,16 @@ async function getSubscriptions() { async function deleteSubscription(subId) { return (await axios.delete(`${(config.orion.orionBaseUrl || 'http://localhost:1027')}${getEndpointVersionApi(subId)}/${subId}`, (config?.orion?.fiwareService ? + { + headers: { - headers: - { - 'Fiware-Service': config.orion.fiwareService || 'service', - 'Fiware-ServicePath': config.orion.fiwareServicePath || '/service' - } + 'Fiware-Service': config.orion.fiwareService || 'service', + 'Fiware-ServicePath': config.orion.fiwareServicePath || '/service' } - : - {} - ))).data + } + : + {} + ))).data } function typesCheck(subTypes) { diff --git a/utils/common.js b/utils/common.js index 67d33fa..112a780 100644 --- a/utils/common.js +++ b/utils/common.js @@ -88,7 +88,12 @@ module.exports = { logger.info("Lost subscription verified") } catch (error) { - logger.error(error.response?.message, error.response?.status, error.response?.statusText, error.response?.data) + logger.error({ + config : errror.config, + status: error.response?.status, + ststusText: error.response?.statusText, + data: error.response?.data + }) } }, From 92650025de17ac84f497cf380cd11c550e039f2f Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Thu, 12 Mar 2026 20:09:53 +0100 Subject: [PATCH 10/19] typo --- utils/common.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/common.js b/utils/common.js index 112a780..bdb7db1 100644 --- a/utils/common.js +++ b/utils/common.js @@ -89,7 +89,7 @@ module.exports = { } catch (error) { logger.error({ - config : errror.config, + config : error.config, status: error.response?.status, ststusText: error.response?.statusText, data: error.response?.data From c68fc15edb2384e8a0af31a38c8863eb90d000d2 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Thu, 12 Mar 2026 20:15:33 +0100 Subject: [PATCH 11/19] fix strange bug --- utils/common.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/common.js b/utils/common.js index bdb7db1..217d1c0 100644 --- a/utils/common.js +++ b/utils/common.js @@ -71,7 +71,8 @@ module.exports = { try { //la seguente riga non funzionerà . Probabilmente bisogna chiamare l'ngsi broker //let entities = await axios.get(config.orion.orionBaseUrl + apiConnector.getEndpointVersionApi().split("subscriptions") + "/entities?type=DistributionDCAT-AP") - let entities = await axios.get(config.orion.ngsiBrokerBaseUrl + "/api/distributiondcatap") + let ngsiBrokerUrl = (config.orion.ngsiBrokerBaseUrl + "/api/distributiondcatap").replaceAll("//", "/") + let entities = await axios.get(ngsiBrokerUrl) for (let ent of entities) { const existingEntity = await Entity.findOne(ent.id) if (existingEntity && existingEntity.modifiedDate.value["@value"] != ent.modifiedDate["@value"]) From 2ea5d373cbabc5c1a7cc14ae36553333d0b91f5a Mon Sep 17 00:00:00 2001 From: Gabriele Percoco <114912675+Percocco@users.noreply.github.com> Date: Tue, 17 Mar 2026 12:41:47 +0100 Subject: [PATCH 12/19] fix type --- package-lock.json | 50 +++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/package-lock.json b/package-lock.json index 1d639fc..26de362 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,7 +15,7 @@ "dotenv": "^16.4.5", "express": "^4.19.2", "ioredis": "^5.3.2", - "jsonwebtoken": "^9.0.2", + "jsonwebtoken": "^9.0.3", "minio": "^8.0.0", "mongoose": "^8.3.4", "nodemon": "^3.1.0", @@ -1625,11 +1625,11 @@ "integrity": "sha512-xHjhDr3cNBK0BzdUJSPXZntQUx/mwMS5Rw4A7lPJ90XGAO6ISP/ePDNuo0vhqOZU+UD5JoodwCAAoZQd3FeAKw==" }, "node_modules/jsonwebtoken": { - "version": "9.0.2", - "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.2.tgz", - "integrity": "sha512-PRp66vJ865SSqOlgqS8hujT5U4AOgMfhrwYIuIhfKaoSCZcirrmASQr8CX7cUg+RMih+hgznrjp99o+W4pJLHQ==", + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.3.tgz", + "integrity": "sha512-MT/xP0CrubFRNLNKvxJ2BYfy53Zkm++5bX9dtuPbqAeQpTVe0MQTFhao8+Cp//EmJp244xt6Drw/GVEGCUj40g==", "dependencies": { - "jws": "^3.2.2", + "jws": "^4.0.1", "lodash.includes": "^4.3.0", "lodash.isboolean": "^3.0.3", "lodash.isinteger": "^4.0.4", @@ -1651,21 +1651,21 @@ "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" }, "node_modules/jwa": { - "version": "1.4.1", - "resolved": "https://registry.npmjs.org/jwa/-/jwa-1.4.1.tgz", - "integrity": "sha512-qiLX/xhEEFKUAJ6FiBMbes3w9ATzyk5W7Hvzpa/SLYdxNtng+gcurvrI7TbACjIXlsJyr05/S1oUhZrc63evQA==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/jwa/-/jwa-2.0.1.tgz", + "integrity": "sha512-hRF04fqJIP8Abbkq5NKGN0Bbr3JxlQ+qhZufXVr0DvujKy93ZCbXZMHDL4EOtodSbCWxOqR8MS1tXA5hwqCXDg==", "dependencies": { - "buffer-equal-constant-time": "1.0.1", + "buffer-equal-constant-time": "^1.0.1", "ecdsa-sig-formatter": "1.0.11", "safe-buffer": "^5.0.1" } }, "node_modules/jws": { - "version": "3.2.2", - "resolved": "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz", - "integrity": "sha512-YHlZCB6lMTllWDtSPHz/ZXTsi8S00usEV6v1tjq8tOUZzw7DpSDWVXjXDre6ed1w/pd495ODpHZYSdkRTsa0HA==", + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/jws/-/jws-4.0.1.tgz", + "integrity": "sha512-EKI/M/yqPncGUUh44xz0PxSidXFr/+r0pA70+gIYhjv+et7yxM+s29Y+VGDkovRofQem0fs7Uvf4+YmAdyRduA==", "dependencies": { - "jwa": "^1.4.1", + "jwa": "^2.0.1", "safe-buffer": "^5.0.1" } }, @@ -4203,11 +4203,11 @@ "integrity": "sha512-xHjhDr3cNBK0BzdUJSPXZntQUx/mwMS5Rw4A7lPJ90XGAO6ISP/ePDNuo0vhqOZU+UD5JoodwCAAoZQd3FeAKw==" }, "jsonwebtoken": { - "version": "9.0.2", - "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.2.tgz", - "integrity": "sha512-PRp66vJ865SSqOlgqS8hujT5U4AOgMfhrwYIuIhfKaoSCZcirrmASQr8CX7cUg+RMih+hgznrjp99o+W4pJLHQ==", + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.3.tgz", + "integrity": "sha512-MT/xP0CrubFRNLNKvxJ2BYfy53Zkm++5bX9dtuPbqAeQpTVe0MQTFhao8+Cp//EmJp244xt6Drw/GVEGCUj40g==", "requires": { - "jws": "^3.2.2", + "jws": "^4.0.1", "lodash.includes": "^4.3.0", "lodash.isboolean": "^3.0.3", "lodash.isinteger": "^4.0.4", @@ -4227,21 +4227,21 @@ } }, "jwa": { - "version": "1.4.1", - "resolved": "https://registry.npmjs.org/jwa/-/jwa-1.4.1.tgz", - "integrity": "sha512-qiLX/xhEEFKUAJ6FiBMbes3w9ATzyk5W7Hvzpa/SLYdxNtng+gcurvrI7TbACjIXlsJyr05/S1oUhZrc63evQA==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/jwa/-/jwa-2.0.1.tgz", + "integrity": "sha512-hRF04fqJIP8Abbkq5NKGN0Bbr3JxlQ+qhZufXVr0DvujKy93ZCbXZMHDL4EOtodSbCWxOqR8MS1tXA5hwqCXDg==", "requires": { - "buffer-equal-constant-time": "1.0.1", + "buffer-equal-constant-time": "^1.0.1", "ecdsa-sig-formatter": "1.0.11", "safe-buffer": "^5.0.1" } }, "jws": { - "version": "3.2.2", - "resolved": "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz", - "integrity": "sha512-YHlZCB6lMTllWDtSPHz/ZXTsi8S00usEV6v1tjq8tOUZzw7DpSDWVXjXDre6ed1w/pd495ODpHZYSdkRTsa0HA==", + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/jws/-/jws-4.0.1.tgz", + "integrity": "sha512-EKI/M/yqPncGUUh44xz0PxSidXFr/+r0pA70+gIYhjv+et7yxM+s29Y+VGDkovRofQem0fs7Uvf4+YmAdyRduA==", "requires": { - "jwa": "^1.4.1", + "jwa": "^2.0.1", "safe-buffer": "^5.0.1" } }, From 567479cbe31af513b4dbedc56f2660202e86b9db Mon Sep 17 00:00:00 2001 From: DEMETRIXASUSgabri Date: Tue, 7 Apr 2026 11:46:23 +0200 Subject: [PATCH 13/19] orion fix --- inputConnectors/apiConnector.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inputConnectors/apiConnector.js b/inputConnectors/apiConnector.js index 1ab9632..b8df278 100644 --- a/inputConnectors/apiConnector.js +++ b/inputConnectors/apiConnector.js @@ -39,7 +39,7 @@ async function createOrionSubscription({ notification: { endpoint: { uri: config.orion.notificationUrl, - accept: "application/json" + Accept: 'application/ld+json, application/json' } }, throttling: 5, From d815b8ac1f5f7fbc5757dc9fbc7ed272a5111675 Mon Sep 17 00:00:00 2001 From: DEMETRIXASUSgabri Date: Tue, 7 Apr 2026 12:08:28 +0200 Subject: [PATCH 14/19] fix orion --- inputConnectors/apiConnector.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inputConnectors/apiConnector.js b/inputConnectors/apiConnector.js index b8df278..83066c3 100644 --- a/inputConnectors/apiConnector.js +++ b/inputConnectors/apiConnector.js @@ -39,14 +39,14 @@ async function createOrionSubscription({ notification: { endpoint: { uri: config.orion.notificationUrl, - Accept: 'application/ld+json, application/json' + accept: "application/json" } }, throttling: 5, expires: new Date(new Date().getTime() + 365 * 24 * 60 * 60 * 1000).toISOString(), } - const headers = { 'Content-Type': 'application/json' }; + const headers = { 'Content-Type': 'application/json', 'Accept': '*/*' }; if (fiwareService) headers['Fiware-Service'] = fiwareService; if (fiwareServicePath) headers['Fiware-ServicePath'] = fiwareServicePath; From ce5ea9ebc7f27b0bd712a709c55a0a3d5d1722bf Mon Sep 17 00:00:00 2001 From: DEMETRIXASUSgabri Date: Tue, 7 Apr 2026 12:17:53 +0200 Subject: [PATCH 15/19] orion fix --- inputConnectors/apiConnector.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inputConnectors/apiConnector.js b/inputConnectors/apiConnector.js index 83066c3..dec2faa 100644 --- a/inputConnectors/apiConnector.js +++ b/inputConnectors/apiConnector.js @@ -46,7 +46,7 @@ async function createOrionSubscription({ expires: new Date(new Date().getTime() + 365 * 24 * 60 * 60 * 1000).toISOString(), } - const headers = { 'Content-Type': 'application/json', 'Accept': '*/*' }; + const headers = { 'Content-Type': 'application/json', 'Accept': config.orion.apiVersion === "v2" ? 'application/json' : 'application/ld+json' }; if (fiwareService) headers['Fiware-Service'] = fiwareService; if (fiwareServicePath) headers['Fiware-ServicePath'] = fiwareServicePath; From 6994e111199054dc7b47f0e50b8c5693c9dbdc93 Mon Sep 17 00:00:00 2001 From: DEMETRIXASUSgabri Date: Tue, 7 Apr 2026 17:16:20 +0200 Subject: [PATCH 16/19] no axios --- api/controllers/controller.js | 4 +-- config.template.js | 5 +++- inputConnectors/apiConnector.js | 53 +++++++++++++++++++++++++++------ package-lock.json | 2 +- package.json | 2 +- 5 files changed, 52 insertions(+), 14 deletions(-) diff --git a/api/controllers/controller.js b/api/controllers/controller.js index a448ceb..409ac17 100644 --- a/api/controllers/controller.js +++ b/api/controllers/controller.js @@ -14,10 +14,10 @@ module.exports = { logger.info("Notification received") try { let response = await service.notifyPath(req, res) - if (typeof response == string) + if (typeof response == "string") return res.send(response) else - res.status(500).send(response.toString() == "[object Object]" ? response : response.toString()) + res.status(500).send(response?.toString() == "[object Object]" ? response : response.toString()) } catch (error) { logger.error(error) diff --git a/config.template.js b/config.template.js index 63e5201..8d65053 100644 --- a/config.template.js +++ b/config.template.js @@ -27,10 +27,13 @@ module.exports = { sessionEndpoint: "http://localhost:5500/api/output?", mapID: "", orion: { + protocol : "http", subscribe: true, deleteAllDuplicateSubscriptions: true, attrWithUrl: "datasetUrl", - orionBaseUrl: "http://localhost:1027", + orionBaseUrl: "http://localhost:1026", + hostname : "localhost", + port: 1026, ngsiBrokerBaseUrl: "https://dx-lab.it/", notificationUrl: "http://localhost:3000/api/orion/subscribe", fiwareService: "", diff --git a/inputConnectors/apiConnector.js b/inputConnectors/apiConnector.js index dec2faa..19e15a7 100644 --- a/inputConnectors/apiConnector.js +++ b/inputConnectors/apiConnector.js @@ -2,6 +2,7 @@ const express = require('express'); const axios = require('axios'); const logger = require('percocologger') const config = require('../config.js'); +const protocol = require(config.orion.protocol || 'http') function getEndpointVersionApi(subId) { return (config.orion.apiVersion == "v2" || (subId && !subId.startsWith("urn:ngsi-ld:Subscription:")) ? "/v2/subscriptions" : "/ngsi-ld/v1/subscriptions") @@ -46,15 +47,49 @@ async function createOrionSubscription({ expires: new Date(new Date().getTime() + 365 * 24 * 60 * 60 * 1000).toISOString(), } - const headers = { 'Content-Type': 'application/json', 'Accept': config.orion.apiVersion === "v2" ? 'application/json' : 'application/ld+json' }; - if (fiwareService) headers['Fiware-Service'] = fiwareService; - if (fiwareServicePath) headers['Fiware-ServicePath'] = fiwareServicePath; + const postData = JSON.stringify(sub); - const url = `${orionBaseUrl.replace(/\/$/, '')}${getEndpointVersionApi()}`; - logger.info(url, sub, { headers }) - const res = await axios.post(url, sub, { headers }); - logger.info({ status: res.status }) - return res.data; + const options = { + hostname: orionBaseUrl.replace(/\/$/, ''), + port: config.orion.port, + path: getEndpointVersionApi(), + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Content-Length': Buffer.byteLength(postData), + } + }; + if (fiwareService) options.headers['Fiware-Service'] = fiwareService; + if (fiwareServicePath) options.headers['Fiware-ServicePath'] = fiwareServicePath; + + return new Promise((resolve, reject) => { + const req = protocol.request(options, (res) => { + let data = ''; + res.on('data', chunk => data += chunk); + res.on('end', () => { + if (res.statusCode >= 200 && res.statusCode < 300) { + resolve(data); // JSON.parse(data) se vuoi un oggetto + } else { + reject({ status: res.statusCode, body: data }); + } + }); + }); + + req.on('error', reject); + req.write(postData); + req.end(); + }); + + //const headers = { 'Content-Type': 'application/json', 'Accept': config.orion.apiVersion === "v2" ? 'application/json' : 'application/ld+json' }; + //if (fiwareService) headers['Fiware-Service'] = fiwareService; + //if (fiwareServicePath) headers['Fiware-ServicePath'] = fiwareServicePath; + + //const url = `${orionBaseUrl.replace(/\/$/, '')}${getEndpointVersionApi()}`; + //logger.info(url, sub, { headers }) + //const res = await axios.post(url, sub, { headers }); + //logger.info({ status: res.status }) + //return res.data; } if (config.orion.subscribe) @@ -69,7 +104,7 @@ if (config.orion.subscribe) }).catch(err => { logger.error("Error creating Orion subscription: ")//, err.response?.data || err.message || err) logger.error({ - config : err.config, + config: err.config, status: err.response?.status, ststusText: err.response?.statusText, data: err.response?.data diff --git a/package-lock.json b/package-lock.json index 26de362..8dc7f12 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "ISC", "dependencies": { "apollo-server-express": "^3.13.0", - "axios": "^1.7.2", + "axios": "1.7.2", "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", diff --git a/package.json b/package.json index a429007..045b234 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ "license": "ISC", "dependencies": { "apollo-server-express": "^3.13.0", - "axios": "^1.7.2", + "axios": "1.7.2", "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", From 857883182cb0dec662e2af509397c3176e7d8dd1 Mon Sep 17 00:00:00 2001 From: DEMETRIXASUSgabri Date: Mon, 20 Apr 2026 16:54:00 +0200 Subject: [PATCH 17/19] fix bug --- api/services/service.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api/services/service.js b/api/services/service.js index 524a4f8..72b182a 100644 --- a/api/services/service.js +++ b/api/services/service.js @@ -233,17 +233,19 @@ async function executeRequest(req, res) { module.exports = { notifyPath: async (req, res) => { let turn = requestStack.length + let result requestStack.push([req, res]) while (turn && requestStack.length > turn) await sleep(100) try { - await executeRequest(...requestStack[0]) + result = await executeRequest(...requestStack[0]) } catch (error) { logger.error(error) return error } requestStack.shift() + return result }, sync() { From 97d2c3b09848a0b2ac16cf2ab8b4603f5574d52d Mon Sep 17 00:00:00 2001 From: DEMETRIXASUSgabri Date: Tue, 21 Apr 2026 18:42:15 +0200 Subject: [PATCH 18/19] verify lost subscription works, queue endpoint added --- api/controllers/controller.js | 6 ++++++ api/routes/router.js | 1 + api/services/service.js | 11 +++++++--- inputConnectors/apiConnector.js | 16 +++++++++++++-- utils/common.js | 36 ++++++++++++++++----------------- 5 files changed, 47 insertions(+), 23 deletions(-) diff --git a/api/controllers/controller.js b/api/controllers/controller.js index 409ac17..76c98a1 100644 --- a/api/controllers/controller.js +++ b/api/controllers/controller.js @@ -23,5 +23,11 @@ module.exports = { logger.error(error) res.status(500).send(error.toString() == "[object Object]" ? error : error.toString()) } + }, + + queue : async (req, res) => { + logger.info("Queue") + return await res.send(await service.queue()) } + } \ No newline at end of file diff --git a/api/routes/router.js b/api/routes/router.js index fe23623..4b1c3b6 100644 --- a/api/routes/router.js +++ b/api/routes/router.js @@ -5,5 +5,6 @@ const { auth } = require("../middlewares/auth.js") router.post(encodeURI("/orion/subscribe/:mapID"), auth, controller.notifyPath) router.put(encodeURI("/query"), auth, controller.sync) +router.get(encodeURI("/queue"), auth, controller.queue) module.exports = router diff --git a/api/services/service.js b/api/services/service.js index 72b182a..2610be6 100644 --- a/api/services/service.js +++ b/api/services/service.js @@ -30,8 +30,8 @@ async function executeRequest(req, res) { for (const ent of entities) { const id = ent.id || ent["@id"] || "unknown-id"; - const modifiedDate = ent.modifiedDate.value["@value"] - let existingEntity = await Entity.findOne({ id }) + const modifiedDate = ent.modifiedDate?.value["@value"] + let existingEntity = await Entity.findOne({ id })//TODO this way different id but same URL will be duplicated; fix if (existingEntity) if (existingEntity.modifiedDate["@value"] != modifiedDate) await Entity.findOneAndUpdate(ent) @@ -45,7 +45,7 @@ async function executeRequest(req, res) { typeof ent[attrWithUrl] === "object" && "value" in ent[attrWithUrl] ) { - urlValue = ent[attrWithUrl].value; + urlValue = ent[attrWithUrl].value || ent[attrWithUrl]; } else if (ent[attrWithUrl]) { urlValue = ent[attrWithUrl]; } else if (ent[attrWithUrl + ":value"]) { @@ -231,6 +231,11 @@ async function executeRequest(req, res) { } module.exports = { + + queue() { + return requestStack.length + }, + notifyPath: async (req, res) => { let turn = requestStack.length let result diff --git a/inputConnectors/apiConnector.js b/inputConnectors/apiConnector.js index 0f2ddb0..7c1b7b2 100644 --- a/inputConnectors/apiConnector.js +++ b/inputConnectors/apiConnector.js @@ -7,6 +7,17 @@ function getEndpointVersionApi(subId) { return (config.orion.apiVersion == "v2" || (subId && !subId.startsWith("urn:ngsi-ld:Subscription:")) ? "/v2/subscriptions" : "/ngsi-ld/v1/subscriptions") } +function runSubscription() { + createOrionSubscription({ + orionBaseUrl: config.orion?.orionBaseUrl || 'http://localhost:1027', + notificationUrl: config.orion?.notificationUrl || 'http://host.docker.internal:3000/api/orion/subscribe', + fiwareService: config.orion?.fiwareService, + fiwareServicePath: config.orion?.fiwareServicePath + }); +} + +setInterval(runSubscription, 24 * 60 * 60 * 1000); + async function createOrionSubscription({ orionBaseUrl, notificationUrl, @@ -46,7 +57,7 @@ async function createOrionSubscription({ expires: new Date(new Date().getTime() + 365 * 24 * 60 * 60 * 1000).toISOString(), } - const headers = { 'Content-Type': 'application/json'}; + const headers = { 'Content-Type': 'application/json' }; if (fiwareService) headers['Fiware-Service'] = fiwareService; if (fiwareServicePath) headers['Fiware-ServicePath'] = fiwareServicePath; @@ -54,6 +65,7 @@ async function createOrionSubscription({ logger.info(url, sub, { headers }) const res = await axios.post(url, sub, { headers }); logger.info({ status: res.status }) + config.orion.purgeSubscriptionsAtStart = false;//only purge at start if there are duplicates, otherwise we can end in a loop of deleting and creating subscription if the orion instance is restarted while the query engine is restarting return res.data; } @@ -69,7 +81,7 @@ if (config.orion.subscribe) }).catch(err => { logger.error("Error creating Orion subscription: ")//, err.response?.data || err.message || err) logger.error({ - config : err.config, + config: err.config, status: err.response?.status, ststusText: err.response?.statusText, data: err.response?.data diff --git a/utils/common.js b/utils/common.js index 217d1c0..5c92037 100644 --- a/utils/common.js +++ b/utils/common.js @@ -72,29 +72,29 @@ module.exports = { //la seguente riga non funzionerà . Probabilmente bisogna chiamare l'ngsi broker //let entities = await axios.get(config.orion.orionBaseUrl + apiConnector.getEndpointVersionApi().split("subscriptions") + "/entities?type=DistributionDCAT-AP") let ngsiBrokerUrl = (config.orion.ngsiBrokerBaseUrl + "/api/distributiondcatap").replaceAll("//", "/") - let entities = await axios.get(ngsiBrokerUrl) + let entities = (await axios.get(ngsiBrokerUrl)).data for (let ent of entities) { - const existingEntity = await Entity.findOne(ent.id) - if (existingEntity && existingEntity.modifiedDate.value["@value"] != ent.modifiedDate["@value"]) - await axios.post("http://localhost:" + config.port || 3000 + "/api/orion/subscribe/6914a252ddb96948ee67b2e1", { - "id": "self", - "type": "Notification", - "subscriptionId": "self", - "notifiedAt": Date.now(), - "data": [ - ent - ] - }) + const existingEntity = await Entity.findOne({ id: ent.id }) + logger.info(ent) + if (!existingEntity || (existingEntity && existingEntity.modifiedDate?.value["@value"] != ent.modifiedDate["@value"])) + try { + await axios.post("http://localhost:" + (config.port || 3001) + "/api/orion/subscribe/6914a252ddb96948ee67b2e1", { + "id": "self", + "type": "Notification", + "subscriptionId": "self", + "notifiedAt": Date.now(), + "data": [ + ent + ] + }) + } catch (error) { + logger.error(error.response.data ? { axios: error.response.data } : error) + } } logger.info("Lost subscription verified") } catch (error) { - logger.error({ - config : error.config, - status: error.response?.status, - ststusText: error.response?.statusText, - data: error.response?.data - }) + logger.error(error.response.data ? { axios: error.response.data } : error) } }, From 61cb192a3949272362e8729ad8273462b8480060 Mon Sep 17 00:00:00 2001 From: DEMETRIXASUSgabri Date: Tue, 28 Apr 2026 16:55:15 +0200 Subject: [PATCH 19/19] fix bug --- utils/common.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/common.js b/utils/common.js index 5c92037..00c37cb 100644 --- a/utils/common.js +++ b/utils/common.js @@ -88,13 +88,13 @@ module.exports = { ] }) } catch (error) { - logger.error(error.response.data ? { axios: error.response.data } : error) + logger.error(error.response?.data ? { axios: error.response.data } : error) } } logger.info("Lost subscription verified") } catch (error) { - logger.error(error.response.data ? { axios: error.response.data } : error) + logger.error(error.response?.data ? { axios: error.response.data } : error) } },