-
Notifications
You must be signed in to change notification settings - Fork 2.8k
[ZEPPELIN-6406] Remove deprecated Flink 1.15/1.16/1.17 shims and add Flink 1.19/1.20 support #5205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
8a3fe07
cca6086
39e0ae2
3fe7a39
e798d02
72428e9
27d4d68
53c43de
bdbf8c7
69bfcdb
3482c42
9dc2b36
aa88f31
4ac40e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
| import org.apache.flink.streaming.experimental.SocketStreamIterator; | ||
| import org.apache.flink.table.api.Table; | ||
| import org.apache.flink.table.api.TableEnvironment; | ||
| import org.apache.flink.table.api.TableResult; | ||
| import org.apache.flink.table.api.TableSchema; | ||
| import org.apache.flink.table.sinks.RetractStreamTableSink; | ||
| import org.apache.flink.types.Row; | ||
|
|
@@ -65,6 +66,9 @@ public abstract class AbstractStreamSqlJob { | |
| protected InterpreterContext context; | ||
| protected TableSchema schema; | ||
| protected SocketStreamIterator<Tuple2<Boolean, Row>> iterator; | ||
| private volatile TableResult insertResult; | ||
| private volatile boolean cancelled = false; | ||
| private volatile boolean cancelledWithSavepoint = false; | ||
| protected Object resultLock = new Object(); | ||
| protected volatile boolean enableToRefresh = true; | ||
| protected int defaultParallelism; | ||
|
|
@@ -151,7 +155,38 @@ public String run(Table table, String tableName) throws IOException { | |
|
|
||
| LOGGER.info("Run job: {}, parallelism: {}", tableName, parallelism); | ||
| String jobName = context.getStringLocalProperty("jobName", tableName); | ||
| table.executeInsert(tableName).await(); | ||
| this.insertResult = table.executeInsert(tableName); | ||
| // Register the job with JobManager so that cancel (with savepoint) works properly | ||
| if (insertResult.getJobClient().isPresent()) { | ||
| jobManager.addJob(context, insertResult.getJobClient().get()); | ||
| } | ||
| // Use a CountDownLatch to wait for job completion while supporting cancellation | ||
| java.util.concurrent.CountDownLatch jobDone = new java.util.concurrent.CountDownLatch(1); | ||
| Thread jobThread = new Thread(() -> { | ||
| try { | ||
| insertResult.await(); | ||
| } catch (Exception e) { | ||
| LOGGER.debug("Job await interrupted or failed", e); | ||
| } finally { | ||
| jobDone.countDown(); | ||
| } | ||
| }, "flink-job-await"); | ||
| jobThread.setDaemon(true); | ||
| jobThread.start(); | ||
|
|
||
| // Wait for either job completion or cancellation | ||
| while (!cancelled && !jobDone.await(1, java.util.concurrent.TimeUnit.SECONDS)) { | ||
| // keep waiting | ||
| } | ||
| if (cancelled) { | ||
| // Wait briefly for the job to finish (e.g. stopped with savepoint) | ||
| jobDone.await(10, java.util.concurrent.TimeUnit.SECONDS); | ||
| if (cancelledWithSavepoint) { | ||
| LOGGER.info("Stream sql job stopped with savepoint, jobName: {}", jobName); | ||
| return buildResult(); | ||
| } | ||
| throw new InterruptedException("Job was cancelled"); | ||
| } | ||
|
Comment on lines
+181
to
+189
|
||
| LOGGER.info("Flink Job is finished, jobName: {}", jobName); | ||
| // wait for retrieve thread consume all data | ||
| LOGGER.info("Waiting for retrieve thread to be done"); | ||
|
|
@@ -161,9 +196,22 @@ public String run(Table table, String tableName) throws IOException { | |
| LOGGER.info("Final Result: {}", finalResult); | ||
| return finalResult; | ||
| } catch (Exception e) { | ||
| if (cancelled) { | ||
| throw new IOException("Job was cancelled", e); | ||
| } | ||
| LOGGER.error("Fail to run stream sql job", e); | ||
| if (e instanceof IOException) { | ||
| throw (IOException) e; | ||
| } | ||
| throw new IOException("Fail to run stream sql job", e); | ||
| } finally { | ||
| if (insertResult != null && insertResult.getJobClient().isPresent()) { | ||
| try { | ||
| jobManager.removeJob(context.getParagraphId()); | ||
| } catch (Exception ex) { | ||
| LOGGER.warn("Failed to remove job from JobManager", ex); | ||
| } | ||
| } | ||
| refreshScheduler.shutdownNow(); | ||
| } | ||
| } | ||
|
|
@@ -238,6 +286,12 @@ public void cancel() { | |
| } | ||
| } | ||
|
|
||
| public void cancel(boolean withSavepoint) { | ||
| LOGGER.info("Canceling stream sql job, withSavepoint={}", withSavepoint); | ||
| this.cancelledWithSavepoint = withSavepoint; | ||
| this.cancelled = true; | ||
| } | ||
|
|
||
| protected abstract void refresh(InterpreterContext context) throws Exception; | ||
|
|
||
| private class RefreshTask implements Runnable { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobManager.addJob(...)is called for the insert job, but this method stores a JobClient/poller keyed by paragraphId and expectsremoveJob(...)to be called to avoid stale entries. Sincerun(...)never removes the job on completion/cancellation, subsequent stream runs in the same paragraph can leave orphaned pollers and can skip starting a new poller due to the existing mapping. Add ajobManager.removeJob(context.getParagraphId())in afinallyblock (guarded by JobClient presence) once the job is done.