diff --git a/.github/workflows/build-backend.yml b/.github/workflows/build-backend.yml new file mode 100644 index 0000000..1a5819a --- /dev/null +++ b/.github/workflows/build-backend.yml @@ -0,0 +1,37 @@ +name: Build Backend + +on: + push: + branches: + - main + - build + pull_request: + branches: + - main + - build + +jobs: + + build-backend: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Log in to Docker Hub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: ./Dockerfile + push: true + tags: ${{ secrets.DOCKER_USERNAME }}/source_connector:latest diff --git a/Dockerfile b/Dockerfile index 591f513..d30d229 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,4 +3,5 @@ WORKDIR /app COPY . . #COPY ../utils ../utils RUN npm install +COPY ./token.template.js ./token.js CMD ["node", "index"] \ No newline at end of file diff --git a/api/controllers/controller.js b/api/controllers/controller.js index e43c4b7..76c98a1 100644 --- a/api/controllers/controller.js +++ b/api/controllers/controller.js @@ -3,7 +3,7 @@ const logger = require('percocologger') module.exports = { - + sync: async (req, res) => { logger.info("Sync") @@ -13,11 +13,21 @@ module.exports = { notifyPath: async (req, res) => { logger.info("Notification received") try { - res.send(await service.notifyPath(req, res)) + let response = await service.notifyPath(req, res) + if (typeof response == "string") + return res.send(response) + else + res.status(500).send(response?.toString() == "[object Object]" ? response : response.toString()) } catch (error) { logger.error(error) res.status(500).send(error.toString() == "[object Object]" ? error : error.toString()) } + }, + + queue : async (req, res) => { + logger.info("Queue") + return await res.send(await service.queue()) } + } \ No newline at end of file diff --git a/api/models/Datapoint.js b/api/models/Datapoint.js index 4cb547e..706293c 100644 --- a/api/models/Datapoint.js +++ b/api/models/Datapoint.js @@ -1,5 +1,6 @@ const mongoose = require("mongoose"); const crypto = require("crypto"); +const config = require("../../config") // Funzione di utilità per pulire il nome della survey const cleanSurveyName = (val) => { @@ -17,7 +18,7 @@ const datapointSchema = new mongoose.Schema( timestamp: { type: String, index: true }, dimensions: { type: Object }, value: Number, - dupl_hash: { type: String }, // Identificatore unico per evitare duplicati + dupl_hash: { type: String, unique : config.upsertRecords == true }, // Identificatore unico per evitare duplicati }, { strict: false, @@ -37,11 +38,12 @@ datapointSchema.pre("save", function (next) { // Funzione per generare l'hash unico (usata nell'inserimento massivo) const generateHash = (doc) => { + return JSON.stringify(doc) // Gestiamo sia se arriva come documento Mongoose sia come oggetto puro const target = doc._doc || doc; const dims = target.dimensions || []; - const sortedDims = [...dims].sort(); + const sortedDims = [...dims].sort();//FIXME funziona solo con gli array, ci vorrebbe Object.entries() const sortedKeys = sortedDims.join("|"); const s = cleanSurveyName(target.survey); diff --git a/api/models/Dimensions.js b/api/models/Dimensions.js new file mode 100644 index 0000000..3c3cd60 --- /dev/null +++ b/api/models/Dimensions.js @@ -0,0 +1,6 @@ +const mongoose = require("mongoose"); + +const dimensions + = new mongoose.Schema({}, { strict: false, versionKey: false }); + +module.exports = mongoose.model("dimensions", dimensions); \ No newline at end of file diff --git a/api/models/Entity.js b/api/models/Entity.js new file mode 100644 index 0000000..09e7ca9 --- /dev/null +++ b/api/models/Entity.js @@ -0,0 +1,5 @@ +const mongoose = require("mongoose"); + +const entity = new mongoose.Schema({}, { strict: false, versionKey: false }) + +module.exports = mongoose.model("entities", entity); \ No newline at end of file diff --git a/api/routes/router.js b/api/routes/router.js index fe23623..4b1c3b6 100644 --- a/api/routes/router.js +++ b/api/routes/router.js @@ -5,5 +5,6 @@ const { auth } = require("../middlewares/auth.js") router.post(encodeURI("/orion/subscribe/:mapID"), auth, controller.notifyPath) router.put(encodeURI("/query"), auth, controller.sync) +router.get(encodeURI("/queue"), auth, controller.queue) module.exports = router diff --git a/api/services/service.js b/api/services/service.js index 5b32f16..2610be6 100644 --- a/api/services/service.js +++ b/api/services/service.js @@ -1,12 +1,15 @@ const logger = require("percocologger"); const log = logger.info; const Datapoints = require("../models/Datapoint"); +const Dimensions = require("../models/Dimensions"); const config = require("../../config"); const minioWriter = require("../../inputConnectors/minioConnector"); const axios = require("axios"); const fs = require("fs"); const { updateJWT } = require("../../utils/keycloak"); let bearerToken; +const Entity = require("../models/Entity") +const { sleep, verifyLostSubscription } = require("../../utils/common") updateJWT() .then((token) => { bearerToken = token; @@ -18,176 +21,242 @@ const path = require("path"); let attrWithUrl = config.orion?.attrWithUrl || "datasetUrl"; require("../../inputConnectors/apiConnector"); -module.exports = { - notifyPath: async (req, res) => { - logger.info({ body: JSON.stringify(req.body) }); +let requestStack = [] +async function executeRequest(req, res) { + logger.info({ body: JSON.stringify(req.body) }); - const data = req.body.data || req.body.value || req.body; - const entities = Array.isArray(data) ? data : [data]; + const data = req.body.data || req.body.value || req.body; + const entities = Array.isArray(data) ? data : [data]; - for (const ent of entities) { - const id = ent.id || ent["@id"] || "unknown-id"; - let urlValue; - if ( - ent[attrWithUrl] && - typeof ent[attrWithUrl] === "object" && - "value" in ent[attrWithUrl] - ) { - urlValue = ent[attrWithUrl].value; - } else if (ent[attrWithUrl]) { - urlValue = ent[attrWithUrl]; - } else if (ent[attrWithUrl + ":value"]) { - urlValue = ent[attrWithUrl + ":value"]; - } else if (ent.value) { - urlValue = ent.value; - } + for (const ent of entities) { + const id = ent.id || ent["@id"] || "unknown-id"; + const modifiedDate = ent.modifiedDate?.value["@value"] + let existingEntity = await Entity.findOne({ id })//TODO this way different id but same URL will be duplicated; fix + if (existingEntity) + if (existingEntity.modifiedDate["@value"] != modifiedDate) + await Entity.findOneAndUpdate(ent) + else + return "Ok" + else + await Entity.insertMany([ent]) + let urlValue; + if ( + ent[attrWithUrl] && + typeof ent[attrWithUrl] === "object" && + "value" in ent[attrWithUrl] + ) { + urlValue = ent[attrWithUrl].value || ent[attrWithUrl]; + } else if (ent[attrWithUrl]) { + urlValue = ent[attrWithUrl]; + } else if (ent[attrWithUrl + ":value"]) { + urlValue = ent[attrWithUrl + ":value"]; + } else if (ent.value) { + urlValue = ent.value; + } - if (!urlValue || typeof urlValue !== "string") { - console.warn(`no URL found for entity ${id}`); - continue; - } + if (!urlValue || typeof urlValue !== "string") { + console.warn(`no URL found for entity ${id}`); + continue; + } - let mapID = - req.query.mapID || req.params.mapID || ent.mapID || config.mapID; + let mapID = + req.query.mapID || req.params.mapID || ent.mapID || config.mapID; - if (!mapID) { - const response = await axios.get(urlValue); - if (response?.data?.data?.datapoints) - await Datapoints.insertMany(response.data.data.datapoints); - else - await minioWriter.insertInDBs(response.data, { - name: id + "-" + path.basename(new URL(urlValue).pathname), - lastModified: new Date(), - versionId: "null", - isDeleteMarker: false, - bucketName: "orion-notify", - size: response.data.length, - isLatest: true, - etag: "", - insertedBy: "orion-notify", - }); - } else { - let response; - let retry = 2; - while (retry > 0) - try { - response = await axios.post( - config.mapEndpoint, - { - sourceDataType: "json", - sourceDataURL: urlValue, - decodeOptions: { - decodeFrom: "json-stat", - }, - config: { - NGSI_entity: false, - ignoreValidation: true, - writers: [], - disableAjv: true, - mappingReport: true, - }, - dataModel: { - $schema: "http://json-schema.org/schema#", - $id: "dataModels/DataModelTemp.json", - title: "DataModelTemp", - description: "Bike Hire Docking Station", - type: "object", - properties: { - region: { - type: "string", - }, - source: { - type: "string", - }, - timestamp: { - type: "string", - }, - survey: { - type: "string", - }, - dimensions: { - type: "object", - }, - value: { - type: "integer", - }, + if (!mapID) { + const response = await axios.get(urlValue); + if (response?.data?.data?.datapoints) + await Datapoints.insertMany(response.data.data.datapoints); + else + await minioWriter.insertInDBs(response.data, { + name: id + "-" + path.basename(new URL(urlValue).pathname), + lastModified: new Date(), + versionId: "null", + isDeleteMarker: false, + bucketName: "orion-notify", + size: response.data.length, + isLatest: true, + etag: "", + insertedBy: "orion-notify", + }); + } else { + let response; + let retry = 2; + while (retry > 0) + try { + response = await axios.post( + config.mapEndpoint, + { + sourceDataType: "json", + sourceDataURL: urlValue, + decodeOptions: { + decodeFrom: "json-stat", + }, + config: { + NGSI_entity: false, + ignoreValidation: true, + writers: [], + disableAjv: true, + mappingReport: true, + }, + dataModel: { + $schema: "http://json-schema.org/schema#", + $id: "dataModels/DataModelTemp.json", + title: "DataModelTemp", + description: "Bike Hire Docking Station", + type: "object", + properties: { + region: { + type: "string", + }, + source: { + type: "string", + }, + timestamp: { + type: "string", + }, + survey: { + type: "string", + }, + dimensions: { + type: "object", + }, + value: { + type: "integer", }, }, - } /* + }, + } /* { //mapID, sourceDataURL: urlValue }*/, - { + { + headers: { + Authorization: `Bearer ${bearerToken}`, + }, + } + ); + retry -= 2; + try { + logger.info("Inserting datapoints into DB..."); + logger.info(response.data); + let outputId = response.data[response.data.length - 1].MAPPING_REPORT.outputId + let lastId + let purged = false + for (let chunkIndex = 0; (response.data[0] || response.data.id); chunkIndex++) { // Loop per gestire i chunk + //while (response.data[0] || response.data.id) { + logger.info(response.data.status) + response = await axios.get((config.sessionEndpoint || "http://localhost:5500/api/output?") + "id=" + outputId + "&lastId=" + lastId + "&index=" + chunkIndex, { headers: { - Authorization: `Bearer ${bearerToken}`, - }, - } - ); - retry -= 2; - try { - logger.info("Inserting datapoints into DB..."); - logger.info(response.data); - let outputId = response.data[response.data.length - 1].MAPPING_REPORT.outputId - let lastId - let purged = false - for (let chunkIndex = 0; (response.data[0] || response.data.id); chunkIndex++) { // Loop per gestire i chunk - //while (response.data[0] || response.data.id) { - logger.info(response.data.status) - response = await axios.get((config.sessionEndpoint || "http://localhost:5500/api/output?") + "id=" + outputId + "&lastId=" + lastId + "&index=" + chunkIndex, { - headers: { - Authorization: `Bearer ${bearerToken}` - } - }) - if (response.data[0]) { - if (!purged) - await Datapoints.deleteMany({ survey: response.data[0].survey }); // Cancellazione preliminare - const dataToInsert = response.data.map((d) => { // Preparazione dei dati - return { - ...d, - fromUrl: urlValue, - }; - }); + Authorization: `Bearer ${bearerToken}` + } + }) + if (response.data[0]) { + if (!purged && !config.upsertRecords) + await Datapoints.deleteMany({ survey: response.data[0].survey }); // Cancellazione preliminare + const dataToInsert = response.data.map((d) => { // Preparazione dei dati + return { + ...d, + fromUrl: urlValue, + }; + }); + if (config.upsertRecords) await Datapoints.upsertMany(dataToInsert); //.map(d => {return {...d, dimensions : {...(d.dimensions), year : d.dimensions.time}}})) //TODO check if datapoints or other data and generalize insertion - lastId = response.data[response.data.length - 1]?._id // Gestione indici per il prossimo loop - purged = true; + else + await Datapoints.insertMany(dataToInsert) + lastId = response.data[response.data.length - 1]?._id // Gestione indici per il prossimo loop + purged = true; + const surveyKey = dataToInsert[0].survey.toUpperCase().replace(/\./g, ""); + const dimensionsFound = await Dimensions.findOne({ survey: surveyKey }); // direttamente un singolo documento + const uniqueKeys = new Set(); + for (const obj of dataToInsert) { + for (const key in obj.dimensions) { + uniqueKeys.add(key); + } + } + if (dimensionsFound) { + for (const key in dimensionsFound.dimensions) { + uniqueKeys.add(key); + } } + let dimensionsObject = {} + for (let key of Array.from(uniqueKeys)) + dimensionsObject[key] = true + const dimensionObject = { + dimensions: dimensionsObject, + survey: surveyKey + }; + await Dimensions.findOneAndUpdate( + { survey: surveyKey }, + dimensionObject, + { upsert: true, new: true } + ); + logger.debug("Dimension object saved/updated:", dimensionObject); } - } catch (error) { - logger.error("Error inserting datapoints:", error); } } catch (error) { - logger.error( - "Error fetching mapped data from API Connector:", - error.response?.data || error.message - ); - try { - bearerToken = await updateJWT(true); - retry--; - } catch (e) { - logger.error("Error updating JWT:", e); - retry--; - } + logger.error("Error inserting datapoints:", error); + } + } catch (error) { + logger.error( + "Error fetching mapped data from API Connector:", + error.response?.data || error.message + ); + try { + bearerToken = await updateJWT(true); + retry--; + } catch (e) { + logger.error("Error updating JWT:", e); + retry--; } - //logger.info(response.data.lenght) + } + //logger.info(response.data.lenght) + + /*for (let i in response.data) + await minioWriter.insertInDBs(response.data[i], { + name: response.data[i].id || mapID + '-' + path.basename((new URL(urlValue)).pathname) + i, + lastModified: new Date(), + versionId: 'null', + isDeleteMarker: false, + bucketName: 'orion-notify', + size: response.data.length, + isLatest: true, + etag: '', + insertedBy: 'orion-notify' + });*/ + } + logger.info(`downloaded ${urlValue}`); + } + return "OK"; +} + +module.exports = { + + queue() { + return requestStack.length + }, - /*for (let i in response.data) - await minioWriter.insertInDBs(response.data[i], { - name: response.data[i].id || mapID + '-' + path.basename((new URL(urlValue)).pathname) + i, - lastModified: new Date(), - versionId: 'null', - isDeleteMarker: false, - bucketName: 'orion-notify', - size: response.data.length, - isLatest: true, - etag: '', - insertedBy: 'orion-notify' - });*/ - } - logger.info(`downloaded ${urlValue}`); + notifyPath: async (req, res) => { + let turn = requestStack.length + let result + requestStack.push([req, res]) + while (turn && requestStack.length > turn) + await sleep(100) + try { + result = await executeRequest(...requestStack[0]) } - return "OK"; + catch (error) { + logger.error(error) + return error + } + requestStack.shift() + return result }, - sync: minioWriter.sync, + sync() { + if (config.sourceConnectors.minioConnector) + minioWriter.sync() + if (config.sourceConnectors.apiConnector) + verifyLostSubscription() + }, }; diff --git a/config.template.js b/config.template.js index db78b82..8d65053 100644 --- a/config.template.js +++ b/config.template.js @@ -27,17 +27,23 @@ module.exports = { sessionEndpoint: "http://localhost:5500/api/output?", mapID: "", orion: { + protocol : "http", subscribe: true, deleteAllDuplicateSubscriptions: true, attrWithUrl: "datasetUrl", - orionBaseUrl: "http://localhost:1027", + orionBaseUrl: "http://localhost:1026", + hostname : "localhost", + port: 1026, + ngsiBrokerBaseUrl: "https://dx-lab.it/", notificationUrl: "http://localhost:3000/api/orion/subscribe", fiwareService: "", - fiwareServicePath: "" + fiwareServicePath: "", + checkSubscriptionInterval: 0 }, logLevel: "info", syncInterval: 86400000, doNotSyncAtStart: false, + upsertRecords: true, delays: 1, queryAllowedExtensions: ["csv", "json", "geojson"], parseCompatibilityMode: 0, diff --git a/index.js b/index.js index 95122c1..09c45a3 100644 --- a/index.js +++ b/index.js @@ -15,6 +15,13 @@ mongoose.connect(config.mongo, { useNewUrlParser: true, useUnifiedTopology: true app.use(express.urlencoded({ extended: false })); app.use(bodyParser.json()); app.use(config.basePath || "/api", routes); - app.listen(port, () => { logger.info(`Source connector server listens on http://localhost:${port}`); }); + app.listen(port, () => { + logger.info(`Source connector server listens on http://localhost:${port}`); + if (config.orion.checkSubscriptionInterval) + setInterval(common.verifyLostSubscription, config.checkSubscriptionInterval) + common.verifyLostSubscription().then(() => { + logger.info("lost subscription verified") + }) + }); logger.info(`Node.js version: ${process.version}`); }) diff --git a/inputConnectors/apiConnector.js b/inputConnectors/apiConnector.js index c773c06..7c1b7b2 100644 --- a/inputConnectors/apiConnector.js +++ b/inputConnectors/apiConnector.js @@ -7,6 +7,17 @@ function getEndpointVersionApi(subId) { return (config.orion.apiVersion == "v2" || (subId && !subId.startsWith("urn:ngsi-ld:Subscription:")) ? "/v2/subscriptions" : "/ngsi-ld/v1/subscriptions") } +function runSubscription() { + createOrionSubscription({ + orionBaseUrl: config.orion?.orionBaseUrl || 'http://localhost:1027', + notificationUrl: config.orion?.notificationUrl || 'http://host.docker.internal:3000/api/orion/subscribe', + fiwareService: config.orion?.fiwareService, + fiwareServicePath: config.orion?.fiwareServicePath + }); +} + +setInterval(runSubscription, 24 * 60 * 60 * 1000); + async function createOrionSubscription({ orionBaseUrl, notificationUrl, @@ -54,6 +65,7 @@ async function createOrionSubscription({ logger.info(url, sub, { headers }) const res = await axios.post(url, sub, { headers }); logger.info({ status: res.status }) + config.orion.purgeSubscriptionsAtStart = false;//only purge at start if there are duplicates, otherwise we can end in a loop of deleting and creating subscription if the orion instance is restarted while the query engine is restarting return res.data; } @@ -67,7 +79,13 @@ if (config.orion.subscribe) if (sub != "Already existing subscription found for the same notification URL.") logger.info("Orion subscription created: " + sub) }).catch(err => { - logger.error("Error creating Orion subscription: ", err.response?.data || err.message || err) + logger.error("Error creating Orion subscription: ")//, err.response?.data || err.message || err) + logger.error({ + config: err.config, + status: err.response?.status, + ststusText: err.response?.statusText, + data: err.response?.data + }) err.response?.config?.data && logger.error(err.response?.config?.data) }) @@ -88,16 +106,16 @@ async function getSubscriptions() { async function deleteSubscription(subId) { return (await axios.delete(`${(config.orion.orionBaseUrl || 'http://localhost:1027')}${getEndpointVersionApi(subId)}/${subId}`, (config?.orion?.fiwareService ? + { + headers: { - headers: - { - 'Fiware-Service': config.orion.fiwareService || 'service', - 'Fiware-ServicePath': config.orion.fiwareServicePath || '/service' - } + 'Fiware-Service': config.orion.fiwareService || 'service', + 'Fiware-ServicePath': config.orion.fiwareServicePath || '/service' } - : - {} - ))).data + } + : + {} + ))).data } function typesCheck(subTypes) { @@ -146,4 +164,4 @@ async function checkMultipleSubscriptions(notificationUrl) { return count; } -module.exports = { createOrionSubscription }; \ No newline at end of file +module.exports = { createOrionSubscription, getEndpointVersionApi }; \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 1d639fc..8dc7f12 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,12 +10,12 @@ "license": "ISC", "dependencies": { "apollo-server-express": "^3.13.0", - "axios": "^1.7.2", + "axios": "1.7.2", "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", "ioredis": "^5.3.2", - "jsonwebtoken": "^9.0.2", + "jsonwebtoken": "^9.0.3", "minio": "^8.0.0", "mongoose": "^8.3.4", "nodemon": "^3.1.0", @@ -1625,11 +1625,11 @@ "integrity": "sha512-xHjhDr3cNBK0BzdUJSPXZntQUx/mwMS5Rw4A7lPJ90XGAO6ISP/ePDNuo0vhqOZU+UD5JoodwCAAoZQd3FeAKw==" }, "node_modules/jsonwebtoken": { - "version": "9.0.2", - "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.2.tgz", - "integrity": "sha512-PRp66vJ865SSqOlgqS8hujT5U4AOgMfhrwYIuIhfKaoSCZcirrmASQr8CX7cUg+RMih+hgznrjp99o+W4pJLHQ==", + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.3.tgz", + "integrity": "sha512-MT/xP0CrubFRNLNKvxJ2BYfy53Zkm++5bX9dtuPbqAeQpTVe0MQTFhao8+Cp//EmJp244xt6Drw/GVEGCUj40g==", "dependencies": { - "jws": "^3.2.2", + "jws": "^4.0.1", "lodash.includes": "^4.3.0", "lodash.isboolean": "^3.0.3", "lodash.isinteger": "^4.0.4", @@ -1651,21 +1651,21 @@ "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" }, "node_modules/jwa": { - "version": "1.4.1", - "resolved": "https://registry.npmjs.org/jwa/-/jwa-1.4.1.tgz", - "integrity": "sha512-qiLX/xhEEFKUAJ6FiBMbes3w9ATzyk5W7Hvzpa/SLYdxNtng+gcurvrI7TbACjIXlsJyr05/S1oUhZrc63evQA==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/jwa/-/jwa-2.0.1.tgz", + "integrity": "sha512-hRF04fqJIP8Abbkq5NKGN0Bbr3JxlQ+qhZufXVr0DvujKy93ZCbXZMHDL4EOtodSbCWxOqR8MS1tXA5hwqCXDg==", "dependencies": { - "buffer-equal-constant-time": "1.0.1", + "buffer-equal-constant-time": "^1.0.1", "ecdsa-sig-formatter": "1.0.11", "safe-buffer": "^5.0.1" } }, "node_modules/jws": { - "version": "3.2.2", - "resolved": "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz", - "integrity": "sha512-YHlZCB6lMTllWDtSPHz/ZXTsi8S00usEV6v1tjq8tOUZzw7DpSDWVXjXDre6ed1w/pd495ODpHZYSdkRTsa0HA==", + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/jws/-/jws-4.0.1.tgz", + "integrity": "sha512-EKI/M/yqPncGUUh44xz0PxSidXFr/+r0pA70+gIYhjv+et7yxM+s29Y+VGDkovRofQem0fs7Uvf4+YmAdyRduA==", "dependencies": { - "jwa": "^1.4.1", + "jwa": "^2.0.1", "safe-buffer": "^5.0.1" } }, @@ -4203,11 +4203,11 @@ "integrity": "sha512-xHjhDr3cNBK0BzdUJSPXZntQUx/mwMS5Rw4A7lPJ90XGAO6ISP/ePDNuo0vhqOZU+UD5JoodwCAAoZQd3FeAKw==" }, "jsonwebtoken": { - "version": "9.0.2", - "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.2.tgz", - "integrity": "sha512-PRp66vJ865SSqOlgqS8hujT5U4AOgMfhrwYIuIhfKaoSCZcirrmASQr8CX7cUg+RMih+hgznrjp99o+W4pJLHQ==", + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.3.tgz", + "integrity": "sha512-MT/xP0CrubFRNLNKvxJ2BYfy53Zkm++5bX9dtuPbqAeQpTVe0MQTFhao8+Cp//EmJp244xt6Drw/GVEGCUj40g==", "requires": { - "jws": "^3.2.2", + "jws": "^4.0.1", "lodash.includes": "^4.3.0", "lodash.isboolean": "^3.0.3", "lodash.isinteger": "^4.0.4", @@ -4227,21 +4227,21 @@ } }, "jwa": { - "version": "1.4.1", - "resolved": "https://registry.npmjs.org/jwa/-/jwa-1.4.1.tgz", - "integrity": "sha512-qiLX/xhEEFKUAJ6FiBMbes3w9ATzyk5W7Hvzpa/SLYdxNtng+gcurvrI7TbACjIXlsJyr05/S1oUhZrc63evQA==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/jwa/-/jwa-2.0.1.tgz", + "integrity": "sha512-hRF04fqJIP8Abbkq5NKGN0Bbr3JxlQ+qhZufXVr0DvujKy93ZCbXZMHDL4EOtodSbCWxOqR8MS1tXA5hwqCXDg==", "requires": { - "buffer-equal-constant-time": "1.0.1", + "buffer-equal-constant-time": "^1.0.1", "ecdsa-sig-formatter": "1.0.11", "safe-buffer": "^5.0.1" } }, "jws": { - "version": "3.2.2", - "resolved": "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz", - "integrity": "sha512-YHlZCB6lMTllWDtSPHz/ZXTsi8S00usEV6v1tjq8tOUZzw7DpSDWVXjXDre6ed1w/pd495ODpHZYSdkRTsa0HA==", + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/jws/-/jws-4.0.1.tgz", + "integrity": "sha512-EKI/M/yqPncGUUh44xz0PxSidXFr/+r0pA70+gIYhjv+et7yxM+s29Y+VGDkovRofQem0fs7Uvf4+YmAdyRduA==", "requires": { - "jwa": "^1.4.1", + "jwa": "^2.0.1", "safe-buffer": "^5.0.1" } }, diff --git a/package.json b/package.json index a429007..045b234 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ "license": "ISC", "dependencies": { "apollo-server-express": "^3.13.0", - "axios": "^1.7.2", + "axios": "1.7.2", "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", diff --git a/token.template.js b/token.template.js new file mode 100644 index 0000000..6ccd301 --- /dev/null +++ b/token.template.js @@ -0,0 +1 @@ +module.exports = "" \ No newline at end of file diff --git a/utils/common.js b/utils/common.js index 271b254..00c37cb 100644 --- a/utils/common.js +++ b/utils/common.js @@ -1,5 +1,8 @@ const logger = require('percocologger') const log = logger.info +const axios = require("axios") +const config = require("../config") +const Entity = require("../api/models/Entity") function objectCheck(objs) { for (let obj of objs) @@ -40,7 +43,7 @@ function convertCSVtoJSON(csvData) { obj[this.deleteSpaces(headers[j].replaceAll(/['"]/g, ''))] = this.deleteSpaces(currentLine[j]?.replaceAll(/['"]/g, '')); results.push(obj); } - + return JSON.stringify(results); } @@ -64,6 +67,37 @@ function syncEntries(obj, visibility, entries) { module.exports = { + async verifyLostSubscription() { + try { + //la seguente riga non funzionerà . Probabilmente bisogna chiamare l'ngsi broker + //let entities = await axios.get(config.orion.orionBaseUrl + apiConnector.getEndpointVersionApi().split("subscriptions") + "/entities?type=DistributionDCAT-AP") + let ngsiBrokerUrl = (config.orion.ngsiBrokerBaseUrl + "/api/distributiondcatap").replaceAll("//", "/") + let entities = (await axios.get(ngsiBrokerUrl)).data + for (let ent of entities) { + const existingEntity = await Entity.findOne({ id: ent.id }) + logger.info(ent) + if (!existingEntity || (existingEntity && existingEntity.modifiedDate?.value["@value"] != ent.modifiedDate["@value"])) + try { + await axios.post("http://localhost:" + (config.port || 3001) + "/api/orion/subscribe/6914a252ddb96948ee67b2e1", { + "id": "self", + "type": "Notification", + "subscriptionId": "self", + "notifiedAt": Date.now(), + "data": [ + ent + ] + }) + } catch (error) { + logger.error(error.response?.data ? { axios: error.response.data } : error) + } + } + logger.info("Lost subscription verified") + } + catch (error) { + logger.error(error.response?.data ? { axios: error.response.data } : error) + } + }, + sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); }, @@ -83,7 +117,7 @@ module.exports = { }, async getEntries(obj, type, name, entries) {// csv, jsonArray, json - + let visibility = getVisibility(name) if (!obj[0].csv && Array.isArray(obj[0].json) && type != "jsonArray") type = "jsonArray" //throw new Error("obj is a jsonArray and not " + type) @@ -98,10 +132,10 @@ module.exports = { else { logger.trace(obj[0]) syncEntries(obj[0], visibility, entries) - + return } - + logger.trace("so it was a geojson") } logger.trace("Here's obj before flatmap") @@ -111,7 +145,7 @@ module.exports = { obj = obj.map(o => o.properties) for (let o of obj) syncEntries(o, visibility, entries) - + return }, @@ -156,7 +190,7 @@ module.exports = { ] const headers = possibleHeaders[0].length > possibleHeaders[1].length ? possibleHeaders[0] : possibleHeaders[1] const results = []; - + for (let i = 1; i < lines.length; i++) { const obj = {}; const currentLine = lines[i].trim().split(possibleHeaders[0].length > possibleHeaders[1].length ? "," : ";"); @@ -164,7 +198,7 @@ module.exports = { obj[this.deleteSpaces(headers[j].replaceAll(/['"]/g, ''))] = this.deleteSpaces(currentLine[j]?.replaceAll(/['"]/g, '')); results.push(obj); } - + return JSON.stringify(results); }, @@ -174,7 +208,7 @@ module.exports = { checkConfig(configIn, configTemplate) { for (let key in configTemplate) { - if (typeof configIn[key] == "object") + if (typeof configIn[key] == "object") configIn[key] = this.checkConfig(configIn[key], configTemplate[key]) else if (configIn[key] == undefined) { logger.warn(`Config key ${key} is missing, using default value`)