Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/net/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def data_received(self, data):
self.unpause('max_in_flight')
if self.in_flight_requests:
next_timeout_at = self.in_flight_requests[0][3]
self.net.reschedule(self._timeout_task, next_timeout_at)
self.net.reschedule(next_timeout_at, self._timeout_task)
else:
self.net.unschedule(self._timeout_task)
self._timeout_task = None
Expand Down
10 changes: 7 additions & 3 deletions kafka/net/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ def call_soon(self, task):
return task

def unschedule(self, task):
self._scheduled.remove((task.scheduled_at, task))
if task.scheduled_at is not None:
self._scheduled.remove((task.scheduled_at, task))
task.scheduled_at = None

def reschedule(self, task, when):
def reschedule(self, when, task):
self.unschedule(task)
self.call_at(when, task)
return task
Expand Down Expand Up @@ -215,7 +217,9 @@ def _wait_read(self, fileobj):

def _schedule_tasks(self):
while self._scheduled and self._scheduled[0][0] <= time.monotonic():
self._ready.append(heapq.heappop(self._scheduled)[1])
_, task = heapq.heappop(self._scheduled)
task.scheduled_at = None
self._ready.append(task)

def _next_scheduled_timeout(self, now):
try:
Expand Down
20 changes: 19 additions & 1 deletion test/net/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,32 @@ def task():
assert len(net._scheduled) == 1
net.unschedule(t)
assert len(net._scheduled) == 0
assert t.scheduled_at is None

def test_unschedule_unscheduled(self):
net = NetworkSelector()
def task():
yield
assert len(net._scheduled) == 0
net.unschedule(Task(task))
assert len(net._scheduled) == 0

def test_reschedule(self):
net = NetworkSelector()
def task():
yield
t = net.call_later(10, task)
new_when = time.monotonic() + 0.01
net.reschedule(t, new_when)
net.reschedule(new_when, t)
assert len(net._scheduled) == 1
assert net._scheduled[0][0] == new_when

def test_reschedule_unscheduled(self):
net = NetworkSelector()
def task():
yield
new_when = time.monotonic() + 0.01
net.reschedule(new_when, Task(task))
assert len(net._scheduled) == 1
assert net._scheduled[0][0] == new_when

Expand Down
Loading