diff --git a/CMakeLists.txt b/CMakeLists.txt index ea277eeb2..ee70c7335 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -264,6 +264,7 @@ else() src/gen/gen-base.cpp src/gen/gen-create.cpp src/gen/gen-discrete-isolation.cpp + src/gen/gen-grouped-linemerge.cpp src/gen/gen-rivers.cpp src/gen/gen-tile-builtup.cpp src/gen/gen-tile-raster.cpp diff --git a/flex-config/gen/grouped-linemerge.lua b/flex-config/gen/grouped-linemerge.lua new file mode 100644 index 000000000..5e41c5b13 --- /dev/null +++ b/flex-config/gen/grouped-linemerge.lua @@ -0,0 +1,106 @@ +-- This config example file is released into the Public Domain. +-- +-- This Lua config demonstrates the 'grouped-linemerge' generalization. It +-- merges connected lines that share the same set of grouping columns into +-- single (multi-)lines, the equivalent of +-- +-- SELECT cols..., (ST_Dump(ST_LineMerge(ST_Collect(geom)))).geom +-- FROM roads GROUP BY cols... +-- +-- but done globally and maintained incrementally on updates. A typical use is +-- merging road segments that render identically (same name/ref/highway/layer) +-- so that labels and route shields are placed on the whole road instead of on +-- each individual OSM way, without the artifacts you get when merging only +-- within a tile. +-- +-- NOTE THAT THE GENERALIZATION SUPPORT IS EXPERIMENTAL AND MIGHT CHANGE +-- WITHOUT NOTICE! +-- +-- Workflow: +-- * Import as usual: osm2pgsql -O flex -S grouped-linemerge.lua DATA.osm.pbf +-- * Build the merged table: osm2pgsql-gen -S grouped-linemerge.lua +-- * Apply an update: osm2pgsql -a -O flex -S grouped-linemerge.lua CHANGES.osc.gz +-- * Update the merged table: osm2pgsql-gen -a -S grouped-linemerge.lua + +-- An expire output with an 'endpoint_table' records the exact endpoints (start +-- and end point) of every way added/edited/deleted during an update, as POINT +-- rows. The grouped-linemerge generalization consumes these points: it walks +-- each affected connected component out from the changed endpoints and +-- re-merges only those, matching by exact endpoint equality (no tiles, no +-- area scan). Deletes contribute the old way's endpoints automatically. +local exp_roads = osm2pgsql.define_expire_output({ + endpoint_table = 'exp_roads_endpoints', +}) + +-- The source table with the original road segments (one row per OSM way). +local roads = osm2pgsql.define_table({ + name = 'roads', + ids = { type = 'way', id_column = 'way_id' }, + columns = { + { column = 'name', type = 'text' }, + { column = 'ref', type = 'text' }, + { column = 'highway', type = 'text' }, + { column = 'layer', type = 'int' }, + -- Attach the expire output to the geometry so that any change to a + -- road's geometry (add/modify/delete) records its endpoints. + { column = 'geom', type = 'linestring', not_null = true, + expire = { { output = exp_roads } } }, + } +}) + +-- The destination table with the merged roads. Its columns are exactly the +-- grouping columns plus the geometry. It has no OSM id column (it is derived +-- data maintained by osm2pgsql-gen, not by the normal update process); the +-- warning osm2pgsql prints about that is expected. +osm2pgsql.define_table({ + name = 'roads_merged', + columns = { + { column = 'name', type = 'text' }, + { column = 'ref', type = 'text' }, + { column = 'highway', type = 'text' }, + { column = 'layer', type = 'int' }, + { column = 'geom', type = 'linestring', not_null = true }, + } +}) + +function osm2pgsql.process_way(object) + local highway = object.tags.highway + if not highway then + return + end + roads:insert({ + name = object.tags.name, + ref = object.tags.ref, + highway = highway, + layer = tonumber(object.tags.layer), + geom = object:as_linestring(), + }) +end + +function osm2pgsql.process_gen() + osm2pgsql.run_gen('grouped-linemerge', { + name = 'roads', -- name (for logging) + debug = false, -- set to true for more detailed debug output + src_table = 'roads', -- input table with the line segments + dest_table = 'roads_merged', -- output table for the merged lines + geom_column = 'geom', -- geometry column (same in src and dest) + + -- Lines are merged when ALL of these columns are equal (NULLs compare + -- equal). Pass them as a comma-separated list. + group_by_columns = 'name, ref, highway, layer', + + -- Optional pre-filter (SQL boolean expression on the source columns). + -- Lines not matching are completely excluded from the generalization. + -- Here we only merge roads that carry a label or a shield. + where = 'name IS NOT NULL OR ref IS NOT NULL', + + -- In append mode, the table of exact changed-way endpoints to consume + -- (written by the expire output's 'endpoint_table' above). + endpoint_table = 'exp_roads_endpoints', + + -- Create functional endpoint indexes on the src/dest tables in create + -- mode. These make the incremental component walk fast. Set to false + -- if you manage the indexes yourself. + create_indexes = true, + }) +end diff --git a/src/expire-output.cpp b/src/expire-output.cpp index 7babda0e1..cdee7bb28 100644 --- a/src/expire-output.cpp +++ b/src/expire-output.cpp @@ -10,9 +10,12 @@ #include "expire-output.hpp" #include "format.hpp" +#include "geom.hpp" +#include "hex.hpp" #include "logging.hpp" #include "pgsql.hpp" #include "tile.hpp" +#include "wkb.hpp" #include #include @@ -50,10 +53,43 @@ void expire_output_t::add_tiles( m_tiles.insert(dirty_tiles.cbegin(), dirty_tiles.cend()); } +void expire_output_t::add_endpoints(geom::geometry_t const &geom) +{ + auto const add_line = [this](geom::linestring_t const &line) { + if (line.empty()) { + return; + } + auto const &first = line.front(); + auto const &last = line.back(); + m_endpoints.emplace(first.x(), first.y()); + m_endpoints.emplace(last.x(), last.y()); + }; + + std::lock_guard const guard{*m_tiles_mutex}; + + m_endpoint_srid = geom.srid(); + if (geom.is_linestring()) { + add_line(geom.get()); + } else if (geom.is_multilinestring()) { + for (auto const &line : geom.get()) { + add_line(line); + } + } +} + bool expire_output_t::empty() noexcept { std::lock_guard const guard{*m_tiles_mutex}; - return m_tiles.empty(); + return m_tiles.empty() && m_endpoints.empty(); +} + +std::vector> expire_output_t::get_endpoints() +{ + std::lock_guard const guard{*m_tiles_mutex}; + std::vector> endpoints(m_endpoints.cbegin(), + m_endpoints.cend()); + m_endpoints.clear(); + return endpoints; } quadkey_list_t expire_output_t::get_tiles() @@ -79,6 +115,9 @@ expire_output_t::output(connection_params_t const &connection_params) if (!m_table.empty()) { num = output_tiles_to_table(get_tiles(), connection_params); } + if (!m_endpoint_table.empty()) { + output_endpoints_to_table(get_endpoints(), connection_params); + } return num; } @@ -140,16 +179,49 @@ std::size_t expire_output_t::output_tiles_to_table( return count; } +std::size_t expire_output_t::output_endpoints_to_table( + std::vector> const &endpoints, + connection_params_t const &connection_params) const +{ + if (endpoints.empty()) { + return 0; + } + + auto const qn = qualified_name(m_schema, m_endpoint_table); + + pg_conn_t const db_connection{connection_params, "expire"}; + + db_connection.prepare("insert_endpoints", + "INSERT INTO {} (geom) VALUES ($1::geometry)", qn); + + for (auto const &[x, y] : endpoints) { + geom::geometry_t const point{geom::point_t{x, y}, m_endpoint_srid}; + db_connection.exec_prepared("insert_endpoints", + util::encode_hex(geom_to_ewkb(point))); + } + + return endpoints.size(); +} + void expire_output_t::create_output_table(pg_conn_t const &db_connection) const { - auto const qn = qualified_name(m_schema, m_table); - db_connection.exec( - "CREATE TABLE IF NOT EXISTS {} (" - " zoom int4 NOT NULL," - " x int4 NOT NULL," - " y int4 NOT NULL," - " first timestamp with time zone DEFAULT CURRENT_TIMESTAMP(0)," - " last timestamp with time zone DEFAULT CURRENT_TIMESTAMP(0)," - " PRIMARY KEY (zoom, x, y))", - qn); + if (!m_table.empty()) { + auto const qn = qualified_name(m_schema, m_table); + db_connection.exec( + "CREATE TABLE IF NOT EXISTS {} (" + " zoom int4 NOT NULL," + " x int4 NOT NULL," + " y int4 NOT NULL," + " first timestamp with time zone DEFAULT CURRENT_TIMESTAMP(0)," + " last timestamp with time zone DEFAULT CURRENT_TIMESTAMP(0)," + " PRIMARY KEY (zoom, x, y))", + qn); + } + + if (!m_endpoint_table.empty()) { + auto const qn = qualified_name(m_schema, m_endpoint_table); + db_connection.exec("CREATE TABLE IF NOT EXISTS {} (" + " geom geometry(Point) NOT NULL)", + qn); + } } diff --git a/src/expire-output.hpp b/src/expire-output.hpp index 312ed7de4..182effd33 100644 --- a/src/expire-output.hpp +++ b/src/expire-output.hpp @@ -17,9 +17,11 @@ #include #include #include +#include #include #include #include +#include constexpr std::size_t DEFAULT_MAX_TILES_GEOMETRY = 10'000'000; constexpr std::size_t DEFAULT_MAX_TILES_OVERALL = 50'000'000; @@ -27,6 +29,10 @@ constexpr std::size_t DEFAULT_MAX_TILES_OVERALL = 50'000'000; class pg_conn_t; class connection_params_t; +namespace geom { +class geometry_t; +} // namespace geom + /** * Output for tile expiry. */ @@ -53,6 +59,35 @@ class expire_output_t m_table = std::move(table); } + std::string const &endpoint_table() const noexcept + { + return m_endpoint_table; + } + + void set_endpoint_table(std::string table) + { + m_endpoint_table = std::move(table); + } + + /// Does this output write expired tiles (to a file and/or table)? + bool has_tile_output() const noexcept + { + return !m_filename.empty() || !m_table.empty(); + } + + /// Does this output write the endpoints of changed geometries to a table? + bool has_endpoint_output() const noexcept + { + return !m_endpoint_table.empty(); + } + + /** + * Record the endpoints (first and last point of each linestring) of a + * changed geometry. They are written to the endpoint table on output. + * Non-(multi)linestring geometries contribute no endpoints. Thread-safe. + */ + void add_endpoints(geom::geometry_t const &geom); + uint32_t minzoom() const noexcept { return m_minzoom; } void set_minzoom(uint32_t minzoom) noexcept { m_minzoom = minzoom; } @@ -116,6 +151,19 @@ class expire_output_t output_tiles_to_table(quadkey_list_t const &tiles_at_maxzoom, connection_params_t const &connection_params) const; + /// Take and clear the collected endpoints. Thread-safe. + std::vector> get_endpoints(); + + /** + * Write the collected endpoints as POINT geometries to the endpoint table. + * + * \param endpoints The endpoint coordinates to write + * \param connection_params Database connection parameters + */ + std::size_t + output_endpoints_to_table(std::vector> const &endpoints, + connection_params_t const &connection_params) const; + /** * Access to the m_tiles collection of expired tiles must go through * this mutex, because it can happend from several threads at the same @@ -136,6 +184,16 @@ class expire_output_t /// The table (if any) for output std::string m_table; + /// The table (if any) for changed-geometry endpoints + std::string m_endpoint_table; + + /// Collected endpoints (deduplicated) of changed geometries. Guarded by + /// m_tiles_mutex (same access pattern as m_tiles). + std::set> m_endpoints; + + /// SRID of the collected endpoints (all geometries share the table's SRID). + int m_endpoint_srid = 0; + /// Minimum zoom level for output uint32_t m_minzoom = 0; diff --git a/src/flex-lua-expire-output.cpp b/src/flex-lua-expire-output.cpp index 172f29e8b..2d2ec3032 100644 --- a/src/flex-lua-expire-output.cpp +++ b/src/flex-lua-expire-output.cpp @@ -39,10 +39,20 @@ create_expire_output(lua_State *lua_state, std::string const &default_schema, new_expire_output.set_schema_and_table(schema, table); lua_pop(lua_state, 2); // "schema" and "table" + // optional "endpoint_table" field: write the endpoints of changed + // geometries as POINT rows (in the same schema) instead of, or in addition + // to, expired tiles. + auto const *endpoint_table = luaX_get_table_string( + lua_state, "endpoint_table", -1, "The expire output", ""); + check_identifier(endpoint_table, "endpoint_table field"); + new_expire_output.set_endpoint_table(endpoint_table); + lua_pop(lua_state, 1); // "endpoint_table" + if (new_expire_output.filename().empty() && - new_expire_output.table().empty()) { - throw std::runtime_error{ - "Must set 'filename' and/or 'table' on expire output."}; + new_expire_output.table().empty() && + new_expire_output.endpoint_table().empty()) { + throw std::runtime_error{"Must set 'filename', 'table', and/or " + "'endpoint_table' on expire output."}; } // required "maxzoom" field diff --git a/src/flex-table-column.cpp b/src/flex-table-column.cpp index 90911eaf6..588355afd 100644 --- a/src/flex-table-column.cpp +++ b/src/flex-table-column.cpp @@ -339,17 +339,34 @@ void flex_table_column_t::do_expire( for (auto const &expire_config : m_expires) { assert(expire_config.expire_output < expire->size()); - auto &expire_tiles = expire->at(expire_config.expire_output); - - if (!expire_config.diff_expire || !enable_diff_expire || - geoms_old->empty() || geoms_new->empty()) { - separate_expire(*geoms_old, expire_config, expire_tiles, - expire_outputs); - separate_expire(*geoms_new, expire_config, expire_tiles, + auto &expire_output = expire_outputs->at(expire_config.expire_output); + + if (expire_output.has_tile_output()) { + auto &expire_tiles = expire->at(expire_config.expire_output); + if (!expire_config.diff_expire || !enable_diff_expire || + geoms_old->empty() || geoms_new->empty()) { + separate_expire(*geoms_old, expire_config, expire_tiles, + expire_outputs); + separate_expire(*geoms_new, expire_config, expire_tiles, + expire_outputs); + } else { + diff_expire(geoms_old, geoms_new, expire_config, expire_tiles, expire_outputs); - } else { - diff_expire(geoms_old, geoms_new, expire_config, expire_tiles, - expire_outputs); + } + } + + // Record the endpoints of the changed geometry (old and new), so a + // consumer (e.g. the grouped-linemerge generalizer) can re-merge only + // the exact connected components touched, instead of everything in a + // tile. Deletes provide the old geometry via the geometry cache, so + // their endpoints are captured here too. + if (expire_output.has_endpoint_output()) { + for (auto const &geom : *geoms_old) { + expire_output.add_endpoints(geom); + } + for (auto const &geom : *geoms_new) { + expire_output.add_endpoints(geom); + } } } } diff --git a/src/gen/gen-create.cpp b/src/gen/gen-create.cpp index 55200b593..37b28de48 100644 --- a/src/gen/gen-create.cpp +++ b/src/gen/gen-create.cpp @@ -12,6 +12,7 @@ #include "format.hpp" #include "gen-base.hpp" #include "gen-discrete-isolation.hpp" +#include "gen-grouped-linemerge.hpp" #include "gen-rivers.hpp" #include "gen-tile-builtup.hpp" #include "gen-tile-raster.hpp" @@ -31,6 +32,10 @@ std::unique_ptr create_generalizer(std::string const &strategy, if (strategy == "discrete-isolation") { return std::make_unique(connection, append, params); } + if (strategy == "grouped-linemerge") { + return std::make_unique(connection, append, + params); + } if (strategy == "raster-union") { return std::make_unique(connection, append, params); diff --git a/src/gen/gen-grouped-linemerge.cpp b/src/gen/gen-grouped-linemerge.cpp new file mode 100644 index 000000000..a923f2969 --- /dev/null +++ b/src/gen/gen-grouped-linemerge.cpp @@ -0,0 +1,336 @@ +/** + * SPDX-License-Identifier: GPL-2.0-or-later + * + * This file is part of osm2pgsql (https://osm2pgsql.org/). + * + * Copyright (C) 2006-2026 by the osm2pgsql developer community. + * For a full list of authors see the git log. + */ + +#include "gen-grouped-linemerge.hpp" + +#include "format.hpp" +#include "logging.hpp" +#include "params.hpp" +#include "pgsql.hpp" + +#include + +#include +#include + +namespace { + +std::string trim(std::string const &str) +{ + auto const begin = str.find_first_not_of(" \t\n\r"); + if (begin == std::string::npos) { + return {}; + } + auto const end = str.find_last_not_of(" \t\n\r"); + return str.substr(begin, end - begin + 1); +} + +} // anonymous namespace + +gen_grouped_linemerge_t::gen_grouped_linemerge_t(pg_conn_t *connection, + bool append, params_t *params) +: gen_base_t(connection, append, params), m_timer_merge(add_timer("merge")), + m_timer_walk(add_timer("walk")), m_timer_delete(add_timer("delete")) +{ + check_src_dest_table_params_exist(); + + // Parse the comma-separated list of grouping columns into the SQL + // fragments we need: + // * group_cols - quoted list "a", "b" (SELECT/GROUP BY/INSERT) + // * group_cols_l - l."a", l."b" (read from source alias) + // * group_cols_l_gk - l."a" AS "gk_a", ... (frontier of the walk) + // * group_cols_gk - "a" AS "gk_a", ... (seed of the walk) + // * group_join - l."a" IS NOT DISTINCT FROM n."gk_a" AND ... + // The walk's frontier table carries the grouping values under "gk_"- + // prefixed names so that an (optional) user 'where' filter, which uses the + // unqualified source column names, is never ambiguous against the frontier. + // IS NOT DISTINCT FROM makes NULLs (e.g. an unnamed road) compare equal, + // matching the GROUP BY semantics. + auto const group_by = get_params().get_string("group_by_columns", ""); + if (group_by.empty()) { + throw fmt_error("Missing 'group_by_columns' parameter in" + " generalizer{}.", + context()); + } + + std::string group_cols; + std::string group_cols_l; + std::string group_cols_l_gk; + std::string group_cols_gk; + std::string group_join; // l (source) vs n (reached node, gk_ prefixed) + std::string group_join_dn; // d (dest) vs n (reached node, gk_ prefixed) + bool first = true; + for (auto const &raw : osmium::split_string(group_by, ',')) { + auto const col = trim(raw); + if (col.empty()) { + continue; + } + check_identifier(col, "group_by_columns"); + if (!first) { + group_cols += ", "; + group_cols_l += ", "; + group_cols_l_gk += ", "; + group_cols_gk += ", "; + group_join += " AND "; + group_join_dn += " AND "; + } + group_cols += fmt::format(R"("{}")", col); + group_cols_l += fmt::format(R"(l."{}")", col); + group_cols_l_gk += fmt::format(R"(l."{0}" AS "gk_{0}")", col); + group_cols_gk += fmt::format(R"("{0}" AS "gk_{0}")", col); + group_join += + fmt::format(R"(l."{0}" IS NOT DISTINCT FROM n."gk_{0}")", col); + group_join_dn += + fmt::format(R"(d."{0}" IS NOT DISTINCT FROM n."gk_{0}")", col); + first = false; + } + if (first) { + throw fmt_error("Parameter 'group_by_columns' is empty in" + " generalizer{}.", + context()); + } + + params->set("group_cols", group_cols); + params->set("group_cols_l", group_cols_l); + params->set("group_cols_l_gk", group_cols_l_gk); + params->set("group_cols_gk", group_cols_gk); + params->set("group_join", group_join); + params->set("group_join_dn", group_join_dn); + + // Optional pre-filter. Lines not matching this are completely excluded + // from the generalization (they never enter the destination table and are + // never walked). The filter is a SQL boolean expression on the source + // columns. We default to 'true' so it composes everywhere, and build a + // matching predicate for the (partial) endpoint indexes. + auto const filter = get_params().get_string("where", ""); + params->set("where", filter.empty() ? std::string{"true"} + : "(" + filter + ")"); + params->set("index_predicate", + filter.empty() ? std::string{} : "WHERE (" + filter + ")"); + + // Names for the functional endpoint indexes optionally created on the + // source table in create mode (and needed for fast walks in append mode). + auto const src_table = get_params().get_identifier("src_table"); + params->set("idx_startpt", src_table + "_glm_startpt"); + params->set("idx_endpt", src_table + "_glm_endpt"); + + // The append-mode delete looks up destination rows by their endpoint + // points, so the destination gets matching functional indexes too. + auto const dest_table = get_params().get_identifier("dest_table"); + params->set("idx_dest_startpt", dest_table + "_glm_startpt"); + params->set("idx_dest_endpt", dest_table + "_glm_endpt"); + + if (append_mode()) { + if (!get_params().has("endpoint_table")) { + throw fmt_error("Missing 'endpoint_table' parameter in" + " generalizer{} (required in append mode).", + context()); + } + params->set("endpoints", qualified_name(get_params().get_identifier( + "schema"), + get_params().get_identifier( + "endpoint_table"))); + } +} + +void gen_grouped_linemerge_t::process() +{ + if (append_mode()) { + process_append(); + } else { + process_create(); + } +} + +void gen_grouped_linemerge_t::process_create() +{ + if (get_params().get_bool("create_indexes", true)) { + log_gen("Creating endpoint indexes on source table..."); + dbexec( + R"(CREATE INDEX IF NOT EXISTS "{idx_startpt}" ON {src} USING btree)" + R"( (ST_StartPoint("{geom_column}")) {index_predicate})"); + dbexec( + R"(CREATE INDEX IF NOT EXISTS "{idx_endpt}" ON {src} USING btree)" + R"( (ST_EndPoint("{geom_column}")) {index_predicate})"); + } + + log_gen("Merging lines by group..."); + timer(m_timer_merge).start(); + connection().exec("BEGIN"); + dbexec("TRUNCATE {dest}"); + auto const result = dbexec(R"( +INSERT INTO {dest} ({group_cols}, "{geom_column}") + SELECT {group_cols}, + (ST_Dump(ST_LineMerge(ST_Collect("{geom_column}" + ORDER BY "{geom_column}")))).geom + FROM {src} + WHERE {where} + GROUP BY {group_cols} +)"); + connection().exec("COMMIT"); + timer(m_timer_merge).stop(); + log_gen("Inserted {} merged linestrings.", result.affected_rows()); + + dbexec("ANALYZE {dest}"); + + if (get_params().get_bool("create_indexes", true)) { + log_gen("Creating endpoint indexes on destination table..."); + dbexec(R"(CREATE INDEX IF NOT EXISTS "{idx_dest_startpt}" ON {dest})" + R"( USING btree (ST_StartPoint("{geom_column}")))"); + dbexec(R"(CREATE INDEX IF NOT EXISTS "{idx_dest_endpt}" ON {dest})" + R"( USING btree (ST_EndPoint("{geom_column}")))"); + } +} + +void gen_grouped_linemerge_t::process_append() +{ + connection().exec("BEGIN"); + + // Step 1: Consume the exact endpoints of every way added/edited/deleted in + // this update. They were captured by the expire output's endpoint table + // (deletes contribute the old way's endpoints via the geometry cache). We + // seed and clean up by exact endpoint equality, so the functional endpoint + // btree indexes are used and unrelated roads merely passing nearby are not + // touched. + dbexec(R"( +CREATE TEMP TABLE _glm_points ON COMMIT DROP AS + WITH consumed AS ( + DELETE FROM {endpoints} RETURNING "geom" + ) + SELECT DISTINCT "geom" AS pt FROM consumed +)"); + + auto const point_count = dbexec("SELECT count(*) FROM _glm_points"); + if (std::strtoll(point_count.get_value(0, 0), nullptr, 10) == 0) { + log_gen("No changed endpoints, nothing to do."); + connection().exec("COMMIT"); + return; + } + + // No spatial index on _glm_points is needed: the seed lookup and the + // point delete both drive *from* this (small) table and probe the source / + // destination endpoint indexes. ANALYZE so the planner knows it is small + // and drives the joins from it. + dbexec("ANALYZE _glm_points"); + + // Step 2: Find the nodes (endpoint points) of every connected component + // touched by the change. We seed from the lines that have a changed point + // as one of their endpoints and walk out along shared endpoints, staying + // within the same grouping key, until each connected component is fully + // explored. The walk matches endpoints with exact geometry equality (so the + // functional btree indexes on the endpoint points can be used) and + // de-duplicates (group, point) pairs, which guarantees termination. All + // groups meeting at a changed point are seeded, which keeps the by-point + // delete in step 4 self-correcting (anything it removes that should survive + // is regenerated). The 'where' filter is applied everywhere a source row is + // read, so excluded lines neither seed nor extend a component. The grouping + // values are carried under "gk_"-prefixed names so the unqualified 'where' + // filter is never ambiguous against them. + timer(m_timer_walk).start(); + dbexec(R"( +CREATE TEMP TABLE _glm_nodes ON COMMIT DROP AS +WITH RECURSIVE +seeds AS ( + -- The lines that touch a changed endpoint, found via the source endpoint + -- index (exact equality, no tile-area scan). + SELECT {group_cols_l}, l."{geom_column}" + FROM _glm_points p + JOIN {src} l + ON ( ST_StartPoint(l."{geom_column}") = p.pt + OR ST_EndPoint(l."{geom_column}") = p.pt ) + AND {where} +), +nodes AS ( + SELECT b.* FROM ( + SELECT {group_cols_gk}, ST_StartPoint("{geom_column}") AS pt FROM seeds + UNION + SELECT {group_cols_gk}, ST_EndPoint("{geom_column}") FROM seeds + ) b + UNION + SELECT {group_cols_l_gk}, + CASE WHEN ST_StartPoint(l."{geom_column}") = n.pt + THEN ST_EndPoint(l."{geom_column}") + ELSE ST_StartPoint(l."{geom_column}") END + FROM nodes n + JOIN {src} l + ON {group_join} + AND ( ST_StartPoint(l."{geom_column}") = n.pt + OR ST_EndPoint(l."{geom_column}") = n.pt ) + AND {where} +) +SELECT * FROM nodes +)"); + dbexec("ANALYZE _glm_nodes"); + + // Step 3: Collect the actual member lines of those components: every source + // line of the right group that touches a reached node at one of its + // endpoints. + auto const ways = dbexec(R"( +CREATE TEMP TABLE _glm_ways ON COMMIT DROP AS +SELECT DISTINCT ON (l.ctid) {group_cols_l}, l."{geom_column}" + FROM _glm_nodes n + JOIN {src} l + ON {group_join} + AND ( ST_StartPoint(l."{geom_column}") = n.pt + OR ST_EndPoint(l."{geom_column}") = n.pt ) + AND {where} + ORDER BY l.ctid +)"); + timer(m_timer_walk).stop(); + log_gen("Collected {} member lines in affected components.", + ways.affected_rows()); + + // Step 4: Delete the existing merged outputs of the touched components. + // A merged output is part of a touched component if and only if one of its + // endpoints coincides exactly with a reached node of the same group (the + // walk only connects lines that share an endpoint exactly, so this matches + // the walk's notion of connectivity and will not delete a different + // component that merely crosses one mid-segment). This must use exact + // endpoint equality, not ST_Intersects, to avoid deleting unrelated + // same-group lines that cross a component. A second pass deletes any output + // whose endpoint coincides with a changed point directly; this cleans up + // components that disappeared entirely (all their lines deleted), which + // leave no reached nodes behind. It is group-agnostic on purpose and stays + // self-correcting because step 2 seeds every group meeting at a changed + // point, so anything removed here that should survive is regenerated below. + timer(m_timer_delete).start(); + auto deleted = dbexec(R"( +DELETE FROM {dest} d + USING _glm_nodes n + WHERE {group_join_dn} + AND ( ST_StartPoint(d."{geom_column}") = n.pt + OR ST_EndPoint(d."{geom_column}") = n.pt ) +)"); + auto const deleted_by_nodes = deleted.affected_rows(); + deleted = dbexec(R"( +DELETE FROM {dest} d + USING _glm_points p + WHERE ST_StartPoint(d."{geom_column}") = p.pt + OR ST_EndPoint(d."{geom_column}") = p.pt +)"); + timer(m_timer_delete).stop(); + log_gen("Deleted {} stale merged linestrings ({} by node, {} by point).", + deleted_by_nodes + deleted.affected_rows(), deleted_by_nodes, + deleted.affected_rows()); + + // Step 5: Regenerate the affected components from scratch. + timer(m_timer_merge).start(); + auto const inserted = dbexec(R"( +INSERT INTO {dest} ({group_cols}, "{geom_column}") + SELECT {group_cols}, + (ST_Dump(ST_LineMerge(ST_Collect("{geom_column}" + ORDER BY "{geom_column}")))).geom + FROM _glm_ways + GROUP BY {group_cols} +)"); + timer(m_timer_merge).stop(); + log_gen("Inserted {} merged linestrings.", inserted.affected_rows()); + + connection().exec("COMMIT"); +} diff --git a/src/gen/gen-grouped-linemerge.hpp b/src/gen/gen-grouped-linemerge.hpp new file mode 100644 index 000000000..ce1c4993c --- /dev/null +++ b/src/gen/gen-grouped-linemerge.hpp @@ -0,0 +1,67 @@ +#ifndef OSM2PGSQL_GEN_GROUPED_LINEMERGE_HPP +#define OSM2PGSQL_GEN_GROUPED_LINEMERGE_HPP + +/** + * SPDX-License-Identifier: GPL-2.0-or-later + * + * This file is part of osm2pgsql (https://osm2pgsql.org/). + * + * Copyright (C) 2006-2026 by the osm2pgsql developer community. + * For a full list of authors see the git log. + */ + +#include "gen-base.hpp" + +#include +#include + +class params_t; +class pg_conn_t; + +/** + * Generalization strategy "grouped-linemerge". + * + * Globally merges connected LineStrings that share the same set of grouping + * column values, equivalent to + * + * INSERT INTO dest (cols..., geom) + * SELECT cols..., (ST_Dump(ST_LineMerge(ST_Collect(geom)))).geom + * FROM src GROUP BY cols...; + * + * Unlike the tile-based strategies this does NOT clip to tiles and the + * destination geometries are global merged lines, not tile-keyed rows. + * + * In append (update) mode the work is done incrementally and locally: the + * expire table (populated by osm2pgsql during the update) is used only as a + * seed for "where did line geometry change". For every changed region we + * walk the connected component(s) of matching lines out from the seed (via a + * recursive query), delete the merged outputs that overlap the region and + * regenerate them from scratch. This keeps each update bounded to the local + * connected component instead of re-merging the whole planet. + */ +class gen_grouped_linemerge_t : public gen_base_t +{ +public: + gen_grouped_linemerge_t(pg_conn_t *connection, bool append, + params_t *params); + + void process() override; + + std::string_view strategy() const noexcept override + { + return "grouped-linemerge"; + } + +private: + /// Build the whole dest table from scratch (create mode). + void process_create(); + + /// Incrementally update the dest table from the expire list (append mode). + void process_append(); + + std::size_t m_timer_merge; + std::size_t m_timer_walk; + std::size_t m_timer_delete; +}; + +#endif // OSM2PGSQL_GEN_GROUPED_LINEMERGE_HPP diff --git a/src/output-flex.cpp b/src/output-flex.cpp index 379e111d6..4af59be08 100644 --- a/src/output-flex.cpp +++ b/src/output-flex.cpp @@ -281,14 +281,16 @@ void create_expire_tables(std::vector const &expire_outputs, { if (std::all_of(expire_outputs.cbegin(), expire_outputs.cend(), [](auto const &expire_output) { - return expire_output.table().empty(); + return expire_output.table().empty() && + expire_output.endpoint_table().empty(); })) { return; } pg_conn_t const connection{connection_params, "out.flex.expire"}; for (auto const &expire_output : expire_outputs) { - if (!expire_output.table().empty()) { + if (!expire_output.table().empty() || + !expire_output.endpoint_table().empty()) { expire_output.create_output_table(connection); } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 93f022e7a..1f8e11482 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -5,9 +5,9 @@ # name - Name of test (source file without suffix) # labels - optional labels for the test function(set_test test_name) - cmake_parse_arguments(test_param "" "" "LABELS" ${ARGN}) + cmake_parse_arguments(test_param "" "" "LABELS;EXTRA_SOURCES" ${ARGN}) - add_executable(${test_name} ${test_name}.cpp) + add_executable(${test_name} ${test_name}.cpp ${test_param_EXTRA_SOURCES}) target_link_libraries(${test_name} osm2pgsql_lib catch_main_lib) add_test(NAME ${test_name} COMMAND ${test_name}) @@ -41,6 +41,11 @@ target_compile_features(catch_main_lib PUBLIC cxx_std_17) set_test(test-check-input LABELS NoDB) set_test(test-db-copy-mgr) set_test(test-db-copy-thread) +# The generalization strategies live in the osm2pgsql-gen executable, not in +# osm2pgsql_lib, so the test compiles the needed sources directly. +set_test(test-gen-grouped-linemerge EXTRA_SOURCES + "${osm2pgsql_SOURCE_DIR}/src/gen/gen-base.cpp" + "${osm2pgsql_SOURCE_DIR}/src/gen/gen-grouped-linemerge.cpp") set_test(test-expire-from-geometry LABELS NoDB) set_test(test-expire-tiles LABELS NoDB) set_test(test-flex-indexes LABELS NoDB) diff --git a/tests/bdd/flex/run-with-endpoint-expire.feature b/tests/bdd/flex/run-with-endpoint-expire.feature new file mode 100644 index 000000000..955333450 --- /dev/null +++ b/tests/bdd/flex/run-with-endpoint-expire.feature @@ -0,0 +1,40 @@ +Feature: Expire changed-geometry endpoints into a table + + Background: + Given the lua style + """ + local eo = osm2pgsql.define_expire_output({ + endpoint_table = 'changed_endpoints' + }) + local t = osm2pgsql.define_way_table('ways', { + { column = 'geom', type = 'linestring', expire = eo }, + { column = 'tags', type = 'jsonb' } + }) + + function osm2pgsql.process_way(object) + if object.tags.highway then + t:insert({ + tags = object.tags, + geom = object:as_linestring() + }) + end + end + """ + + Scenario: Endpoints of a changed way are written to the endpoint table + Given the input file 'liechtenstein-2013-08-03.osm.pbf' + When running osm2pgsql flex with parameters + | --slim | -c | + # The initial import does not run expiry, so no endpoints are recorded. + Then table changed_endpoints has 0 rows + + Given the OSM data + """ + n100 v1 dV x10.0 y47.0 + n101 v1 dV x10.001 y47.0 + w200 v1 dV Nn100,n101 Thighway=residential + """ + When running osm2pgsql flex with parameters + | --slim | -a | + # The added way contributes its two distinct endpoints. + Then table changed_endpoints has 2 rows diff --git a/tests/test-gen-grouped-linemerge.cpp b/tests/test-gen-grouped-linemerge.cpp new file mode 100644 index 000000000..db6fb337e --- /dev/null +++ b/tests/test-gen-grouped-linemerge.cpp @@ -0,0 +1,319 @@ +/** + * SPDX-License-Identifier: GPL-2.0-or-later + * + * This file is part of osm2pgsql (https://osm2pgsql.org/). + * + * Copyright (C) 2006-2026 by the osm2pgsql developer community. + * For a full list of authors see the git log. + */ + +#include + +#include "common-pg.hpp" +#include "format.hpp" +#include "gen/gen-grouped-linemerge.hpp" +#include "params.hpp" +#include "pgsql.hpp" + +#include +#include +#include +#include +#include +#include + +namespace { + +testing::pg::tempdb_t db; + +// Run the grouped-linemerge strategy (create or append) against the test +// tables. conn_t is a pg_conn_t, so it can be passed straight to the strategy. +void run_gen(testing::pg::conn_t &conn, bool append, + char const *src_table = "glm_lines", + char const *dest_table = "glm_merged", + char const *group_by_columns = "grp", + char const *where = nullptr) +{ + params_t params; + params.set("schema", "public"); + params.set("src_table", src_table); + params.set("dest_table", dest_table); + params.set("geom_column", "geom"); + params.set("group_by_columns", group_by_columns); + if (where != nullptr) { + params.set("where", where); + } + params.set("endpoint_table", "glm_exp"); + + gen_grouped_linemerge_t gen{&conn, append, ¶ms}; + gen.process(); +} + +void setup_tables(testing::pg::conn_t &conn) +{ + conn.exec("DROP TABLE IF EXISTS glm_lines, glm_merged, glm_exp, glm_ref" + " CASCADE"); + conn.exec("CREATE TABLE glm_lines" + " (grp text, geom geometry(LineString, 3857) NOT NULL)"); + conn.exec("CREATE INDEX ON glm_lines USING gist (geom)"); + conn.exec("CREATE TABLE glm_merged (grp text, geom geometry NOT NULL)"); + conn.exec("CREATE INDEX ON glm_merged USING gist (geom)"); + // The change signal: the exact endpoints of changed ways (what osm2pgsql's + // expire output with an 'endpoint_table' writes during an update). + conn.exec("CREATE TABLE glm_exp (geom geometry(Point, 3857) NOT NULL)"); +} + +void insert_edge(testing::pg::conn_t &conn, std::string const &grp, + std::string const &wkt) +{ + conn.exec(fmt::format("INSERT INTO glm_lines (grp, geom)" + " VALUES ('{}', ST_GeomFromText('{}', 3857))", + grp, wkt)); +} + +void delete_edge(testing::pg::conn_t &conn, std::string const &grp, + std::string const &wkt) +{ + conn.exec(fmt::format("DELETE FROM glm_lines WHERE grp = '{}'" + " AND ST_Equals(geom, ST_GeomFromText('{}', 3857))", + grp, wkt)); +} + +// Record the exact endpoints of a changed way, exactly as osm2pgsql's expire +// output with an 'endpoint_table' would during an update (start and end point +// of the changed geometry). +void expire(testing::pg::conn_t &conn, std::string const &wkt) +{ + conn.exec(fmt::format( + "INSERT INTO glm_exp (geom)" + " VALUES (ST_StartPoint(ST_GeomFromText('{0}', 3857)))," + " (ST_EndPoint(ST_GeomFromText('{0}', 3857)))", + wkt)); +} + +// Whether the strategy's output equals what a from-scratch GROUP BY + +// ST_LineMerge would produce on the current source. Compares as a set +// (geometric equality, group-aware, NULL-safe) AND on row count, so leftover +// duplicates or missing pieces are both caught. +bool matches_reference(testing::pg::conn_t &conn) +{ + conn.exec("DROP TABLE IF EXISTS glm_ref"); + conn.exec("CREATE TABLE glm_ref AS" + " SELECT grp, (ST_Dump(ST_LineMerge(ST_Collect(geom)))).geom AS geom" + " FROM glm_lines GROUP BY grp"); + + int const ref = conn.result_as_int("SELECT count(*) FROM glm_ref"); + int const strat = conn.result_as_int("SELECT count(*) FROM glm_merged"); + int const strat_extra = conn.result_as_int( + "SELECT count(*) FROM glm_merged m WHERE NOT EXISTS (SELECT 1 FROM" + " glm_ref r WHERE r.grp IS NOT DISTINCT FROM m.grp" + " AND ST_Equals(r.geom, m.geom))"); + int const ref_extra = conn.result_as_int( + "SELECT count(*) FROM glm_ref r WHERE NOT EXISTS (SELECT 1 FROM" + " glm_merged m WHERE r.grp IS NOT DISTINCT FROM m.grp" + " AND ST_Equals(r.geom, m.geom))"); + + INFO("reference=" << ref << " strategy=" << strat + << " strategy_only=" << strat_extra + << " reference_only=" << ref_extra); + return ref == strat && strat_extra == 0 && ref_extra == 0; +} + +struct edge_t +{ + std::string wkt; + bool present = false; + std::string grp; + + explicit edge_t(std::string w) : wkt(std::move(w)) {} +}; + +// All horizontal and vertical segments of a GW x GH grid (the candidate +// "ways"). Interior grid points become degree-3+ junctions when same-group +// edges meet there, which is exactly the case that makes ST_LineMerge split a +// connected component into several output lines. +std::vector build_grid_edges() +{ + constexpr int GW = 4; + constexpr int GH = 4; + constexpr int STEP = 500; + std::vector edges; + auto const seg = [](int x1, int y1, int x2, int y2) { + return fmt::format("LINESTRING({} {},{} {})", x1, y1, x2, y2); + }; + for (int j = 0; j < GH; ++j) { + for (int i = 0; i < GW - 1; ++i) { + edges.emplace_back(seg(i * STEP, j * STEP, (i + 1) * STEP, j * STEP)); + } + } + for (int i = 0; i < GW; ++i) { + for (int j = 0; j < GH - 1; ++j) { + edges.emplace_back(seg(i * STEP, j * STEP, i * STEP, (j + 1) * STEP)); + } + } + return edges; +} + +constexpr std::array GROUPS = {"a", "b", "c"}; + +} // anonymous namespace + +TEST_CASE("grouped-linemerge: create merges connected same-group lines") +{ + auto conn = db.connect(); + setup_tables(conn); + + // "a": two connected segments; "b": one segment touching them but in a + // different group (must not merge); plus a disjoint "a" segment far away. + insert_edge(conn, "a", "LINESTRING(0 0,500 0)"); + insert_edge(conn, "a", "LINESTRING(500 0,1000 0)"); + insert_edge(conn, "b", "LINESTRING(500 0,500 500)"); + insert_edge(conn, "a", "LINESTRING(5000 0,5500 0)"); + + run_gen(conn, false); + + REQUIRE(matches_reference(conn)); + // "a" -> merged (0..1000) + disjoint (5000..5500) = 2 rows; "b" -> 1 row. + CHECK(conn.get_count("glm_merged") == 3); + CHECK(conn.get_count("glm_merged", "grp = 'a'") == 2); +} + +TEST_CASE("grouped-linemerge: incremental updates match full re-merge (fuzz)") +{ + auto conn = db.connect(); + + constexpr int OPS_PER_SEED = 120; + + for (unsigned const seed : {1U, 2U, 3U, 4U}) { + setup_tables(conn); + auto edges = build_grid_edges(); + std::mt19937 rng{seed}; + + // Random initial population, then a from-scratch build. + for (auto &e : edges) { + if (rng() % 2U == 0U) { + e.present = true; + e.grp = GROUPS.at(rng() % 3U); + insert_edge(conn, e.grp, e.wkt); + } + } + run_gen(conn, false); + INFO("seed=" << seed << " phase=create"); + REQUIRE(matches_reference(conn)); + + // Random connect/disconnect operations, each followed by an + // incremental append that must reproduce the from-scratch result. + for (int op = 0; op < OPS_PER_SEED; ++op) { + std::vector present; + std::vector absent; + for (std::size_t i = 0; i < edges.size(); ++i) { + (edges[i].present ? present : absent).push_back(i); + } + + std::string desc; + bool const do_add = + !absent.empty() && (present.empty() || (rng() % 2U == 0U)); + if (do_add) { + auto const idx = absent[rng() % absent.size()]; + auto &e = edges[idx]; + e.grp = GROUPS.at(rng() % 3U); // may differ from last time: a retag + e.present = true; + insert_edge(conn, e.grp, e.wkt); + expire(conn, e.wkt); + desc = fmt::format("add slot={} grp={}", idx, e.grp); + } else if (!present.empty()) { + auto const idx = present[rng() % present.size()]; + auto &e = edges[idx]; + expire(conn, e.wkt); // expire the old footprint, then remove it + delete_edge(conn, e.grp, e.wkt); + e.present = false; + desc = fmt::format("del slot={} grp={}", idx, e.grp); + } else { + continue; + } + + run_gen(conn, true); + + INFO("seed=" << seed << " op=" << op << " " << desc); + REQUIRE(matches_reference(conn)); + } + } +} + +TEST_CASE("grouped-linemerge: multi-column grouping, NULL keys, where filter") +{ + auto conn = db.connect(); + conn.exec("DROP TABLE IF EXISTS glm2_lines, glm2_merged, glm_exp, glm2_ref" + " CASCADE"); + conn.exec("CREATE TABLE glm2_lines" + " (name text, ref text, geom geometry(LineString, 3857) NOT NULL)"); + conn.exec("CREATE INDEX ON glm2_lines USING gist (geom)"); + conn.exec("CREATE TABLE glm2_merged" + " (name text, ref text, geom geometry NOT NULL)"); + conn.exec("CREATE INDEX ON glm2_merged USING gist (geom)"); + conn.exec("CREATE TABLE glm_exp (geom geometry(Point, 3857) NOT NULL)"); + + char const *const cols = "name, ref"; + char const *const filter = "(name IS NOT NULL OR ref IS NOT NULL)"; + + auto ins = [&](char const *name, char const *ref, std::string const &wkt) { + conn.exec(fmt::format( + "INSERT INTO glm2_lines (name, ref, geom) VALUES ({}, {}," + " ST_GeomFromText('{}', 3857))", + name ? fmt::format("'{}'", name) : "NULL", + ref ? fmt::format("'{}'", ref) : "NULL", wkt)); + }; + + // The reference applies the same filter and multi-column grouping; the + // match is group-aware (NULL-safe) and geometric. + auto matches = [&]() { + conn.exec("DROP TABLE IF EXISTS glm2_ref"); + conn.exec(fmt::format( + "CREATE TABLE glm2_ref AS SELECT {0}," + " (ST_Dump(ST_LineMerge(ST_Collect(geom)))).geom AS geom" + " FROM glm2_lines WHERE {1} GROUP BY {0}", + cols, filter)); + int const ref = conn.result_as_int("SELECT count(*) FROM glm2_ref"); + int const strat = conn.result_as_int("SELECT count(*) FROM glm2_merged"); + int const se = conn.result_as_int( + "SELECT count(*) FROM glm2_merged m WHERE NOT EXISTS (SELECT 1 FROM" + " glm2_ref r WHERE r.name IS NOT DISTINCT FROM m.name" + " AND r.ref IS NOT DISTINCT FROM m.ref AND ST_Equals(r.geom, m.geom))"); + int const re = conn.result_as_int( + "SELECT count(*) FROM glm2_ref r WHERE NOT EXISTS (SELECT 1 FROM" + " glm2_merged m WHERE r.name IS NOT DISTINCT FROM m.name" + " AND r.ref IS NOT DISTINCT FROM m.ref AND ST_Equals(r.geom, m.geom))"); + INFO("reference=" << ref << " strategy=" << strat << " strategy_only=" + << se << " reference_only=" << re); + return ref == strat && se == 0 && re == 0; + }; + + // 'Main St'/NULL: three connected segments -> one line. + ins("Main St", nullptr, "LINESTRING(0 0,500 0)"); + ins("Main St", nullptr, "LINESTRING(500 0,1000 0)"); + ins("Main St", nullptr, "LINESTRING(1000 0,1500 0)"); + // NULL/'I 5': two connected segments -> one line (NULL name groups). + ins(nullptr, "I 5", "LINESTRING(0 500,500 500)"); + ins(nullptr, "I 5", "LINESTRING(500 500,1000 500)"); + // NULL/NULL: excluded by the filter entirely. + ins(nullptr, nullptr, "LINESTRING(0 1000,500 1000)"); + // 'Main St'/'I 5': a distinct group (differs from 'Main St'/NULL on ref). + ins("Main St", "I 5", "LINESTRING(0 1500,500 1500)"); + + run_gen(conn, false, "glm2_lines", "glm2_merged", cols, filter); + INFO("phase=create"); + REQUIRE(matches()); + CHECK(conn.get_count("glm2_merged", "name IS NULL AND ref IS NULL") == 0); + CHECK(conn.get_count("glm2_merged", "name = 'Main St' AND ref IS NULL") == 1); + + // Incremental: remove the middle 'Main St'/NULL segment; the component must + // split into two, with everything else untouched. + expire(conn, "LINESTRING(500 0,1000 0)"); + conn.exec("DELETE FROM glm2_lines WHERE name = 'Main St' AND ref IS NULL" + " AND ST_Equals(geom, ST_GeomFromText('LINESTRING(500 0,1000 0)'," + " 3857))"); + run_gen(conn, true, "glm2_lines", "glm2_merged", cols, filter); + INFO("phase=append-shatter"); + REQUIRE(matches()); + CHECK(conn.get_count("glm2_merged", "name = 'Main St' AND ref IS NULL") == 2); +}