Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions dsqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type DSQueue struct {
enqueue chan []byte
clear chan chan<- int
closeTimeout time.Duration
empty chan struct{}
getn chan getRequest
name string
}
Expand All @@ -62,6 +63,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
enqueue: make(chan []byte),
clear: make(chan chan<- int),
closeTimeout: cfg.closeTimeout,
empty: make(chan struct{}),
getn: make(chan getRequest),
name: name,
}
Expand Down Expand Up @@ -94,6 +96,13 @@ func (q *DSQueue) Close() error {
return err
}

// Empty returns a channel that is signaled when the queue is empty. This is
// useful for exiting select when there are currently no more queued items to
// read.
func (q *DSQueue) Empty() <-chan struct{} {
return q.empty
}

// Put puts an item into the queue.
func (q *DSQueue) Put(item []byte) (err error) {
if len(item) == 0 {
Expand Down Expand Up @@ -222,6 +231,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
var (
commit bool
dsEmpty bool
empty chan struct{}
err error
idle bool
)
Expand Down Expand Up @@ -271,6 +281,9 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
var dequeue chan []byte
if item != nil {
dequeue = q.dequeue
empty = nil
} else {
empty = q.empty
}

select {
Expand Down Expand Up @@ -301,6 +314,8 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
if bufferSize != 0 && inBuf.Len() >= bufferSize {
commit = true
}
case empty <- struct{}{}:

case getRequest := <-q.getn:
n := getRequest.n
rspChan := getRequest.rsp
Expand Down
63 changes: 63 additions & 0 deletions dsqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,69 @@ func TestBasicOperation(t *testing.T) {
}
}

func TestReadUntilEmpty(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
q := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithDedupCacheSize(0))
defer q.Close()

cids := random.Cids(29)
for _, c := range cids {
q.Put(c.Bytes())
}

var i int

loop:
for {
select {
case outItem := <-q.Out():
outCid, err := cid.Parse(outItem)
if err != nil {
t.Fatal(err)
}
if outCid != cids[i] {
t.Fatal("retrieved items out of order")
}
i++
case <-q.Empty():
break loop
}
}

if i != len(cids) {
t.Fatalf("dequeued wrond number of items, expected %d, got %d", len(cids), i)
}

// Check still empty.
select {
case <-q.Empty():
case <-q.Out():
t.Fatal("should not have any more data")
case <-time.After(time.Second):
t.Fatal("did not receive empty signal")
}

q.Put(cids[0].Bytes())

// Check for data item.
select {
case <-q.Out():
case <-q.Empty():
t.Fatal("should not have received empty signal")
case <-time.After(time.Second):
t.Fatal("did not receive any data")
}

// Check for empty.
select {
case <-q.Empty():
case <-q.Out():
t.Fatal("should not have any more data")
case <-time.After(time.Second):
t.Fatal("did not receive empty signal")
}
}

func TestGetN(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithDedupCacheSize(0))
Expand Down
Loading