diff --git a/api/src/org/labkey/api/exp/Lsid.java b/api/src/org/labkey/api/exp/Lsid.java index 0cfa516eba4..2ddfee94a31 100644 --- a/api/src/org/labkey/api/exp/Lsid.java +++ b/api/src/org/labkey/api/exp/Lsid.java @@ -308,9 +308,9 @@ static public String namespaceLikeString(String namespace) return "urn:lsid:%:" + namespace + ".%:%"; } - static public String namespaceFilter(String columnName, String namespace) + static public SQLFragment namespaceFilter(Enum column, String namespace) { - return columnName + " LIKE '" + namespaceLikeString(namespace) + "'"; + return new SQLFragment().appendIdentifier(column.name()).append(" LIKE ?").add(namespaceLikeString(namespace)); } /** diff --git a/api/src/org/labkey/api/exp/api/ExperimentService.java b/api/src/org/labkey/api/exp/api/ExperimentService.java index ce8efc452e8..100df3d2af9 100644 --- a/api/src/org/labkey/api/exp/api/ExperimentService.java +++ b/api/src/org/labkey/api/exp/api/ExperimentService.java @@ -188,8 +188,23 @@ enum DataTypeForExclusion List getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol); List getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol, @NotNull Predicate filterFn); - - List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container); + + /** + * @param filterSQL optional additional WHERE predicates; callers doing keyset pagination should include + * {@code ER.RowId > minRowId} here + * @param limit max rows to return; pass {@code Table.ALL_ROWS} (-1) for no limit + * @return up to {@code limit} ExpRuns in {@code container} matching {@code filterSQL}, ordered by RowId + */ + List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container, int limit); + + /** + * @param modifiedSince optional upper-exclusive Modified cutoff; pass {@code null} to return all batches + * @param minRowId keyset cursor — only batches with RowId > minRowId are returned; pass 0 for the first page + * @param limit max rows to return + * @return up to {@code limit} assay batches for {@code batchProtocol} in {@code container} with + * RowId > minRowId (and Modified > modifiedSince when non-null), ordered by RowId + */ + List getExpBatches(@NotNull Container container, @NotNull ExpProtocol batchProtocol, @Nullable Date modifiedSince, long minRowId, int limit); List getExpRunsForJobId(long jobId); diff --git a/api/src/org/labkey/api/search/SearchService.java b/api/src/org/labkey/api/search/SearchService.java index 48a2632c59a..d1f6ef82af3 100644 --- a/api/src/org/labkey/api/search/SearchService.java +++ b/api/src/org/labkey/api/search/SearchService.java @@ -75,6 +75,7 @@ public interface SearchService extends SearchMXBean Logger _log = LogHelper.getLogger(SearchService.class, "Full text search service"); long DEFAULT_FILE_SIZE_LIMIT = 100L; // 100 MB + int INDEXING_LIMIT = 1_000; /** * Returns the max file size indexed @@ -494,12 +495,21 @@ public String normalizeHref(Path contextPath, Container c) interface DocumentProvider { /** - * Enumerate documents for full-text search. Unless it's known there will be a small number of documents - * added to the queue, add Runnable to the IndexTask that adds the Resources from the container to the queue. - * If there are potentially many documents for a container, add resources in batches of 1,000 or so to avoid - * a huge memory footprint. + * Enumerate documents for full-text search indexing. Do NOT fetch an unbounded result set into memory. * - * @param modifiedSince when null, do a full reindex; otherwise incremental (either modified > modifiedSince, or modified > lastIndexed) + *

Pattern 1 — recursive requeue (preferred when the underlying table supports keyset pagination). + * Fetch at most {@link SearchService#INDEXING_LIMIT} rows, process them, then re-enqueue the next batch + * only if the batch was full. This keeps the ResultSet closed between batches and interleaves with other + * queue work. See {@code ExperimentServiceImpl.indexMaterials()} and + * {@code AssayManager.indexAssayRuns()} for examples.

+ * + *

Pattern 2 — forEachBatch + per-batch runnable (simpler when using {@code TableSelector}). + * Stream rows in batches of {@link SearchService#INDEXING_LIMIT} and wrap each batch in a + * {@code queue.addRunnable()} so indexing is deferred. See + * {@code InventoryManager.indexLocations()} and {@code NotebookManager.indexNotebooks()} + * for examples.

+ * + * @param modifiedSince when null, do a full reindex; otherwise incremental (either modified > modifiedSince, or modified > lastIndexed) */ void enumerateDocuments(TaskIndexingQueue adder, @Nullable Date modifiedSince); diff --git a/assay/src/org/labkey/assay/AssayManager.java b/assay/src/org/labkey/assay/AssayManager.java index 871591e59cd..4199fb734cc 100644 --- a/assay/src/org/labkey/assay/AssayManager.java +++ b/assay/src/org/labkey/assay/AssayManager.java @@ -16,6 +16,7 @@ package org.labkey.assay; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -710,33 +711,51 @@ public void indexAssayBatches(SearchService.TaskIndexingQueue queue, @Nullable D for (ExpProtocol protocol : getAssayProtocols(queue.getContainer())) { if (shouldIndexProtocolBatches(protocol)) - indexAssayBatches(queue, protocol, modifiedSince); + queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, 0)); } } - private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince) + private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, + @Nullable Date modifiedSince, long minRowId) { - if (shouldIndexProtocolBatches(protocol)) - { - for (ExpExperiment batch : protocol.getBatches(queue.getContainer())) - { - if (modifiedSince == null || modifiedSince.before(batch.getModified())) - indexAssayBatch(queue, batch); - } - } + List batches = ExperimentService.get().getExpBatches( + queue.getContainer(), protocol, modifiedSince, minRowId, SearchService.INDEXING_LIMIT); + + MutableLong maxRowIdProcessed = new MutableLong(minRowId); + batches.forEach(b -> { + indexAssayBatch(queue, b); + maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), b.getRowId())); + }); + + if (batches.size() == SearchService.INDEXING_LIMIT) + queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, maxRowIdProcessed.longValue())); } public void indexAssayRuns(SearchService.TaskIndexingQueue queue, @Nullable Date modifiedSince) { for (ExpProtocol protocol : getAssayProtocols(queue.getContainer())) - indexAssayRuns(queue, protocol, modifiedSince); + queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, 0)); } - private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince) + private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, + @Nullable Date modifiedSince, long minRowId) { - ExperimentService.get().getExpRuns(queue.getContainer(), protocol, null, run -> - modifiedSince == null || modifiedSince.before(run.getModified()) - ).forEach(r -> indexAssayRun(queue, r)); + SQLFragment filterSQL = new SQLFragment("ER.ProtocolLSID = ? AND ER.RowId > ?") + .add(protocol.getLSID()) + .add(minRowId); + if (modifiedSince != null) + filterSQL.append(" AND ER.Modified > ?").add(modifiedSince); + + List runs = ExperimentService.get().getExpRuns(filterSQL, _ -> true, queue.getContainer(), SearchService.INDEXING_LIMIT); + + MutableLong maxRowIdProcessed = new MutableLong(minRowId); + runs.forEach(r -> { + indexAssayRun(queue, r); + maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), r.getRowId())); + }); + + if (runs.size() == SearchService.INDEXING_LIMIT) + queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, maxRowIdProcessed.longValue())); } @Override diff --git a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java index bf51266dc3f..7186bda9205 100644 --- a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java @@ -564,7 +564,7 @@ public List getExpRuns(Container container, @Nullable ExpProtocol pa sql.add(childProtocol.getLSID()); } - return getExpRuns(sql, filterFn, container); + return getExpRuns(sql, filterFn, container, Table.ALL_ROWS); } @Override @@ -582,17 +582,17 @@ public boolean hasExpRuns(Container container, @NotNull Predicate filter } @Override - public List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container) + public List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container, int limit) { - SQLFragment sql = new SQLFragment(" SELECT ER.* " - + " FROM exp.ExperimentRun ER " - + " WHERE ER.Container = ? "); + SQLFragment sql = new SQLFragment("SELECT ER.* FROM exp.ExperimentRun ER WHERE ER.Container = ?"); sql.add(container.getId()); - if (null != filterSQL && !filterSQL.isEmpty()) - sql.append(" AND " ).append(filterSQL); - - sql.append(" ORDER BY ER.RowId "); + sql.append(" AND ").append(filterSQL); + sql.append(" ORDER BY ER.RowId"); + if (limit > 0) + { + sql = getSchema().getSqlDialect().limitRows(sql, limit); + } try (Stream runs = new SqlSelector(getSchema(), sql).setJdbcCaching(false).uncachedStream(ExperimentRun.class)) { @@ -600,6 +600,25 @@ public List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Pre } } + @Override + public List getExpBatches(@NotNull Container container, @NotNull ExpProtocol batchProtocol, + @Nullable Date modifiedSince, long minRowId, int limit) + { + SQLFragment sql = new SQLFragment("SELECT E.* FROM ").append(getTinfoExperiment(), "E") + .append(" WHERE E.Container = ?").add(container.getId()) + .append(" AND E.BatchProtocolId = ?").add(batchProtocol.getRowId()) + .append(" AND E.RowId > ?").add(minRowId); + if (modifiedSince != null) + sql.append(" AND E.Modified > ?").add(modifiedSince); + sql.append(" ORDER BY E.RowId"); + if (limit > 0) + { + sql = getSchema().getSqlDialect().limitRows(sql, limit); + } + + return ExpExperimentImpl.fromExperiments(new SqlSelector(getSchema(), sql).setJdbcCaching(false).getArray(Experiment.class)); + } + @Override public List getExpRunsForJobId(long jobId) { @@ -745,7 +764,7 @@ public List getExpDatas(Container container, @Nullable DataType typ { SimpleFilter filter = SimpleFilter.createContainerFilter(container); if (type != null) - filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID.name(), type.getNamespacePrefix()), null); + filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID, type.getNamespacePrefix())); if (name != null) filter.addCondition(FieldKey.fromParts(ExpDataTable.Column.Name.name()), name); @@ -756,7 +775,7 @@ public List getOutputDatas(long runRowId, @Nullable DataType type) { SimpleFilter filter = new SimpleFilter(FieldKey.fromParts("RunId"), runRowId); if (type != null) - filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID.name(), type.getNamespacePrefix()), null); + filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID, type.getNamespacePrefix())); return getExpDatas(filter); } @@ -1018,8 +1037,6 @@ public List getExpMaterialsByObjectId(ContainerFilter container return result; } - private static final int INDEXING_LIMIT = 1_000; - @Override public void enumerateDocuments(SearchService.TaskIndexingQueue queue, final Date modifiedSince) { @@ -1076,7 +1093,7 @@ private void indexMaterials(final @NotNull SearchService.TaskIndexingQueue queue if (!modifiedSQL.isEmpty()) sql.append(" AND ").append(modifiedSQL); sql.append(" ORDER BY RowId"); - sql = getSchema().getSqlDialect().limitRows(sql, INDEXING_LIMIT); + sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); SqlSelector selector = new SqlSelector(getSchema(), sql); selector.setJdbcCaching(false); MutableLong maxRowIdProcessed = new MutableLong(minRowId); @@ -1089,7 +1106,7 @@ private void indexMaterials(final @NotNull SearchService.TaskIndexingQueue queue maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expMaterial.getRowId())); }); - if (materials.size() == INDEXING_LIMIT) + if (materials.size() == SearchService.INDEXING_LIMIT) { // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents queue.addRunnable((q) -> indexMaterials(q, modifiedSince, maxRowIdProcessed.longValue())); @@ -1114,7 +1131,7 @@ public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, fina sql.append(" AND ").append(modifiedSQL); sql.append(" ORDER BY RowId"); - sql = getSchema().getSqlDialect().limitRows(sql, INDEXING_LIMIT); + sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); SqlSelector selector = new SqlSelector(getSchema(), sql); selector.setJdbcCaching(false); MutableLong maxRowIdProcessed = new MutableLong(minRowId); @@ -1127,7 +1144,7 @@ public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, fina maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expData.getRowId())); }); - if (data.size() == INDEXING_LIMIT) + if (data.size() == SearchService.INDEXING_LIMIT) { // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents queue.addRunnable((q) -> indexData(q, modifiedSince, maxRowIdProcessed.longValue())); diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index 8cbd4c9ad87..a692f2f9091 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -17,6 +17,7 @@ package org.labkey.experiment.api; import org.apache.commons.collections4.ListUtils; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Strings; import org.apache.commons.math3.util.Precision; @@ -346,11 +347,11 @@ public void indexSampleType(ExpSampleType sampleType, SearchService.TaskIndexing impl.index(q, null); } - indexSampleTypeMaterials(sampleType, q); + indexSampleTypeMaterials(sampleType, q, 0); }); } - private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.TaskIndexingQueue queue) + private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.TaskIndexingQueue queue, long minRowId) { // Index all ExpMaterial that have never been indexed OR where either the ExpSampleType definition or ExpMaterial itself has changed since last indexed SQLFragment sql = new SQLFragment("SELECT m.* FROM ") @@ -359,17 +360,28 @@ private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.Ta .append(ExperimentServiceImpl.get().getTinfoMaterialIndexed(), "mi") .append(" ON m.RowId = mi.MaterialId WHERE m.LSID NOT LIKE ").appendValue("%:" + StudyService.SPECIMEN_NAMESPACE_PREFIX + "%", getExpSchema().getSqlDialect()) .append(" AND m.cpasType = ?").add(sampleType.getLSID()) + .append(" AND m.RowId > ?").add(minRowId) .append(" AND (mi.lastIndexed IS NULL OR mi.lastIndexed < ? OR (m.modified IS NOT NULL AND mi.lastIndexed < m.modified))") .append(" ORDER BY m.RowId") // Issue 51263: order by RowId to reduce deadlock .add(sampleType.getModified()); - - new SqlSelector(getExpSchema().getScope(), sql).forEachBatch(Material.class, 1000, batch -> { - for (Material m : batch) - { - ExpMaterialImpl impl = new ExpMaterialImpl(m); - impl.index(queue, null /* null tableInfo since samples may belong to multiple containers*/); - } + sql = getExpSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); + SqlSelector selector = new SqlSelector(getExpSchema().getScope(), sql); + selector.setJdbcCaching(false); + MutableLong maxRowIdProcessed = new MutableLong(minRowId); + + // Work in modest block sizes and fetch as a list so we don't keep the ResultSet open, which could lock the tables + List materials = selector.getArrayList(Material.class); + materials.forEach(m -> { + ExpMaterialImpl impl = new ExpMaterialImpl(m); + impl.index(queue, null /* null tableInfo since samples may belong to multiple containers*/); + maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), impl.getRowId())); }); + + if (materials.size() == SearchService.INDEXING_LIMIT) + { + // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents + queue.addRunnable((q) -> indexSampleTypeMaterials(sampleType, q, maxRowIdProcessed.longValue())); + } }