Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Storage interface {

type Communication interface {
// Nodes returns all nodes that participate in the epoch.
Nodes() []NodeID
Nodes() Nodes

// Send sends a message to the given destination node
Send(msg *Message, destination NodeID)
Expand Down Expand Up @@ -130,8 +130,33 @@ type SignatureAggregator interface {
// Aggregate aggregates several signatures into a QuorumCertificate
Aggregate([]Signature) (QuorumCertificate, error)

// AppendSignatures appends signatures to an existing signature.
// If the existing signature is empty, it just aggregates the given signatures.
AppendSignatures([]byte, ...[]byte) ([]byte, error)

// IsQuorum returns true if the given signers constitute a quorum.
// In the case of PoA, this means at least a quorum of the nodes are given.
// In the case of PoS, this means at least two thirds of the st.
IsQuorum([]NodeID) bool
}

// Nodes is a list of Node elements.
type Nodes []Node

// NodeIDs returns the NodeIDs of the nodes in the Nodes.
func (nws Nodes) NodeIDs() []NodeID {
nodes := make([]NodeID, len(nws))
for i, nw := range nws {
nodes[i] = nw.Node
}
return nodes
}

// Node is a struct that pairs a node with its weight in the signature aggregator.
type Node struct {
Node NodeID
Weight uint64
}

// SignatureAggregatorCreator creates a SignatureAggregator from a list of nodes and their weights.
type SignatureAggregatorCreator func([]Node) SignatureAggregator
2 changes: 1 addition & 1 deletion blacklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (bl *Blacklist) ApplyUpdates(updates []BlacklistUpdate, round uint64) Black
}

// garbageCollectSuspectedNodes returns a new list of suspected nodes for the given round.
// Nodes that are no longer suspected or have been redeemed, will not be included in the returned suspected nodes.
// NodeIDs that are no longer suspected or have been redeemed, will not be included in the returned suspected nodes.
// It will also garbage-collect any redeem votes from past orbits, unless hey have surpassed the threshold of f+1.
// It does not modify the current blacklist.
func (bl *Blacklist) garbageCollectSuspectedNodes(round uint64) SuspectedNodes {
Expand Down
4 changes: 2 additions & 2 deletions blacklist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ func TestComputeBlacklistUpdates(t *testing.T) {
func TestAdvanceRound(t *testing.T) {
nodes := []uint16{0, 1, 2, 3}

// Nodes 0, 2 are suspected.
// Nodes 1 and 3 are not suspected.
// NodeIDs 0, 2 are suspected.
// NodeIDs 1 and 3 are not suspected.
// Node 2 can be redeemed.
suspectedNodesBefore := SuspectedNodes{
{NodeIndex: 0, SuspectingCount: 2, OrbitSuspected: 1, RedeemingCount: 1, OrbitToRedeem: 1},
Expand Down
40 changes: 22 additions & 18 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type EpochConfig struct {
Signer Signer
Verifier SignatureVerifier
BlockDeserializer BlockDeserializer
SignatureAggregator SignatureAggregator
SignatureAggregatorCreator SignatureAggregatorCreator
Comm Communication
Storage Storage
WAL WriteAheadLog
Expand All @@ -83,6 +83,7 @@ type EpochConfig struct {
type Epoch struct {
EpochConfig
// Runtime
signatureAggregator SignatureAggregator
oneTimeVerifier *OneTimeVerifier
buildBlockScheduler *BasicScheduler
blockVerificationScheduler *BlockDependencyManager
Expand All @@ -94,6 +95,7 @@ type Epoch struct {
blockBuilderCtx context.Context
blockBuilderCancelFunc context.CancelFunc
nodes NodeIDs
nodeWeights Nodes
eligibleNodeIDs map[string]struct{}
rounds map[uint64]*Round
emptyVotes map[uint64]*EmptyVoteSet
Expand Down Expand Up @@ -198,8 +200,9 @@ func (e *Epoch) init() error {
e.finishCtx, e.finishFn = context.WithCancel(context.Background())
e.blockBuilderCtx = context.Background()
e.blockBuilderCancelFunc = func() {}
e.nodes = e.Comm.Nodes()
SortNodes(e.nodes)
e.nodeWeights = e.Comm.Nodes()
SortNodes(e.nodeWeights)
e.nodes = e.nodeWeights.NodeIDs()
e.timedOutRounds = make(map[uint16]uint64, len(e.nodes))
e.redeemedRounds = make(map[uint16]uint64, len(e.nodes))
e.rounds = make(map[uint64]*Round)
Expand All @@ -208,6 +211,7 @@ func (e *Epoch) init() error {
e.futureMessages = make(messagesFromNode, len(e.nodes))
e.replicationState = NewReplicationState(e.Logger, e.Comm, e.ID, e.MaxRoundWindow, e.ReplicationEnabled, e.StartTime, &e.lock, e.RandomSource)
e.timeoutHandler = NewTimeoutHandler(e.Logger, "emptyVoteRebroadcast", e.StartTime, e.MaxRebroadcastWait, e.emptyVoteTimeoutTaskRunner)
e.signatureAggregator = e.SignatureAggregatorCreator(e.nodeWeights)

for _, node := range e.nodes {
e.futureMessages[string(node)] = make(map[uint64]*messagesForRound)
Expand Down Expand Up @@ -739,7 +743,7 @@ func (e *Epoch) handleFinalizationMessage(message *Finalization, from NodeID) er
return nil
}

if err := VerifyQC(message.QC, e.Logger, "Finalization", e.SignatureAggregator.IsQuorum, e.eligibleNodeIDs, message, from); err != nil {
if err := VerifyQC(message.QC, e.Logger, "Finalization", e.signatureAggregator.IsQuorum, e.eligibleNodeIDs, message, from); err != nil {
e.Logger.Debug("Received an invalid finalization",
zap.Int("round", int(message.Finalization.Round)),
zap.Stringer("NodeID", from))
Expand Down Expand Up @@ -1159,7 +1163,7 @@ func (e *Epoch) maybeCollectFinalization(round *Round) error {
var finalizations []*FinalizeVote

for _, finalizationsWithTheSameDigest := range finalizationsByMD {
if e.SignatureAggregator.IsQuorum(NodeIDsFromVotes(finalizationsWithTheSameDigest)) {
if e.signatureAggregator.IsQuorum(NodeIDsFromVotes(finalizationsWithTheSameDigest)) {
finalizations = finalizationsWithTheSameDigest
break
}
Expand All @@ -1174,7 +1178,7 @@ func (e *Epoch) maybeCollectFinalization(round *Round) error {
}

func (e *Epoch) assembleFinalization(round *Round, finalizationVotes []*FinalizeVote) error {
finalization, err := NewFinalization(e.Logger, e.SignatureAggregator, finalizationVotes)
finalization, err := NewFinalization(e.Logger, e.signatureAggregator, finalizationVotes)
if err != nil {
return err
}
Expand Down Expand Up @@ -1387,13 +1391,13 @@ func (e *Epoch) maybeAssembleEmptyNotarization() error {
}

// Check if we found a quorum of votes for the same metadata
popularEmptyVote, signatures, found := findEmptyVoteThatIsQuorum(emptyVotes.votes, e.SignatureAggregator.IsQuorum)
popularEmptyVote, signatures, found := findEmptyVoteThatIsQuorum(emptyVotes.votes, e.signatureAggregator.IsQuorum)
if !found {
e.Logger.Debug("Could not find empty vote with a quorum or more votes", zap.Uint64("round", e.round))
return nil
}

qc, err := e.SignatureAggregator.Aggregate(signatures)
qc, err := e.signatureAggregator.Aggregate(signatures)
if err != nil {
e.Logger.Error("Could not aggregate empty votes signatures", zap.Error(err), zap.Uint64("round", e.round))
return nil
Expand Down Expand Up @@ -1500,15 +1504,15 @@ func (e *Epoch) maybeCollectNotarization() error {
}
}

if !e.SignatureAggregator.IsQuorum(NodeIDsFromVotes(votesForOurBlock)) {
if !e.signatureAggregator.IsQuorum(NodeIDsFromVotes(votesForOurBlock)) {
e.Logger.Debug("Not enough votes to form a notarization for our block",
zap.Uint64("round", e.round),
zap.Int("voteForOurBlock", len(votesForOurBlock)),
zap.Int("total votes", voteCount))
return nil
}

notarization, err := NewNotarization(e.Logger, e.SignatureAggregator, votesForCurrentRound, block.BlockHeader())
notarization, err := NewNotarization(e.Logger, e.signatureAggregator, votesForCurrentRound, block.BlockHeader())
if err != nil {
return err
}
Expand Down Expand Up @@ -1594,7 +1598,7 @@ func (e *Epoch) handleEmptyNotarizationMessage(emptyNotarization *EmptyNotarizat
}

// Otherwise, this round is not notarized or finalized yet, so verify the empty notarization and store it.
if err := VerifyQC(emptyNotarization.QC, e.Logger, "Empty notarization", e.SignatureAggregator.IsQuorum, e.eligibleNodeIDs, emptyNotarization, from); err != nil {
if err := VerifyQC(emptyNotarization.QC, e.Logger, "Empty notarization", e.signatureAggregator.IsQuorum, e.eligibleNodeIDs, emptyNotarization, from); err != nil {
return nil
}

Expand Down Expand Up @@ -1650,7 +1654,7 @@ func (e *Epoch) handleNotarizationMessage(message *Notarization, from NodeID) er
return nil
}

if err := VerifyQC(message.QC, e.Logger, "Notarization", e.SignatureAggregator.IsQuorum, e.eligibleNodeIDs, message, from); err != nil {
if err := VerifyQC(message.QC, e.Logger, "Notarization", e.signatureAggregator.IsQuorum, e.eligibleNodeIDs, message, from); err != nil {
return nil
}

Expand Down Expand Up @@ -3206,20 +3210,20 @@ func (e *Epoch) verifyQuorumRound(q QuorumRound, from NodeID) error {

if q.Finalization != nil {
// extra check needed if we have a finalized block
err := VerifyQC(q.Finalization.QC, e.Logger, "Finalization", e.SignatureAggregator.IsQuorum, e.eligibleNodeIDs, q.Finalization, from)
err := VerifyQC(q.Finalization.QC, e.Logger, "Finalization", e.signatureAggregator.IsQuorum, e.eligibleNodeIDs, q.Finalization, from)
if err != nil {
return errors.New("invalid finalization")
}
}

if q.Notarization != nil {
if err := VerifyQC(q.Notarization.QC, e.Logger, "Notarization", e.SignatureAggregator.IsQuorum, e.eligibleNodeIDs, q.Notarization, from); err != nil {
if err := VerifyQC(q.Notarization.QC, e.Logger, "Notarization", e.signatureAggregator.IsQuorum, e.eligibleNodeIDs, q.Notarization, from); err != nil {
return fmt.Errorf("invalid notarization: %v", err)
}
}

if q.EmptyNotarization != nil {
err := VerifyQC(q.EmptyNotarization.QC, e.Logger, "Empty notarization", e.SignatureAggregator.IsQuorum, e.eligibleNodeIDs, q.EmptyNotarization, from)
err := VerifyQC(q.EmptyNotarization.QC, e.Logger, "Empty notarization", e.signatureAggregator.IsQuorum, e.eligibleNodeIDs, q.EmptyNotarization, from)
if err != nil {
return fmt.Errorf("invalid empty notarization QC: %v", err)
}
Expand Down Expand Up @@ -3421,9 +3425,9 @@ func (e *Epoch) nextSeqToCommit() uint64 {
}

// SortNodes sorts the nodes in place by their byte representations.
func SortNodes(nodes []NodeID) {
slices.SortFunc(nodes, func(a, b NodeID) int {
return bytes.Compare(a[:], b[:])
func SortNodes(nodes Nodes) {
slices.SortFunc(nodes, func(a, b Node) int {
return bytes.Compare(a.Node[:], b.Node[:])
})
}

Expand Down
20 changes: 10 additions & 10 deletions epoch_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ func TestEpochLeaderFailoverWithEmptyNotarization(t *testing.T) {

func TestEpochRebroadcastsEmptyVoteAfterBlockProposalReceived(t *testing.T) {
bb := testutil.NewTestBlockBuilder()
nodes := []NodeID{{1}, {2}, {3}, {4}}
nodes := NodeIDs{{1}, {2}, {3}, {4}}

comm := newRebroadcastComm(nodes)
comm := newRebroadcastComm(nodes.EqualWeightedNodes())
conf, wal, _ := testutil.DefaultTestNodeEpochConfig(t, nodes[3], comm, bb)
epochTime := conf.StartTime
e, err := NewEpoch(conf)
Expand Down Expand Up @@ -349,12 +349,12 @@ func TestEpochLeaderFailoverDoNotPersistEmptyRoundTwice(t *testing.T) {
}

func TestEpochLeaderRecursivelyFetchNotarizedBlocks(t *testing.T) {
nodes := []NodeID{{1}, {2}, {3}, {4}}
nodes := NodeIDs{{1}, {2}, {3}, {4}}
bb := testutil.NewTestBlockBuilder()

recordedMessages := make(chan *Message, 100)

comm := &recordingComm{Communication: testutil.NoopComm(nodes), SentMessages: recordedMessages}
comm := &recordingComm{Communication: testutil.NoopComm(nodes.EqualWeightedNodes()), SentMessages: recordedMessages}
conf, wal, _ := testutil.DefaultTestNodeEpochConfig(t, nodes[0], comm, bb)

e, err := NewEpoch(conf)
Expand Down Expand Up @@ -1115,18 +1115,18 @@ func TestEpochBlacklist(t *testing.T) {
}

type rebroadcastComm struct {
nodes []NodeID
nodes Nodes
emptyVotes chan *EmptyVote
}

func newRebroadcastComm(nodes []NodeID) *rebroadcastComm {
func newRebroadcastComm(nodes Nodes) *rebroadcastComm {
return &rebroadcastComm{
nodes: nodes,
emptyVotes: make(chan *EmptyVote, 10),
}
}

func (r *rebroadcastComm) Nodes() []NodeID {
func (r *rebroadcastComm) Nodes() Nodes {
return r.nodes
}

Expand All @@ -1142,9 +1142,9 @@ func (r *rebroadcastComm) Broadcast(msg *Message) {

func TestEpochRebroadcastsEmptyVote(t *testing.T) {
bb := testutil.NewTestBlockBuilder()
nodes := []NodeID{{1}, {2}, {3}, {4}}
nodes := NodeIDs{{1}, {2}, {3}, {4}}

comm := newRebroadcastComm(nodes)
comm := newRebroadcastComm(nodes.EqualWeightedNodes())
conf, wal, _ := testutil.DefaultTestNodeEpochConfig(t, nodes[3], comm, bb)
epochTime := conf.StartTime
e, err := NewEpoch(conf)
Expand Down Expand Up @@ -1227,7 +1227,7 @@ func runCrashAndRestartExecution(t *testing.T, e *Epoch, bb *testutil.TestBlockB

// Case 2:
t.Run(fmt.Sprintf("%s-with-crash", t.Name()), func(t *testing.T) {
conf, _, _ := testutil.DefaultTestNodeEpochConfig(t, nodes[0], testutil.NewNoopComm(nodes), bbAfterCrash)
conf, _, _ := testutil.DefaultTestNodeEpochConfig(t, nodes[0].Node, testutil.NewNoopComm(nodes.NodeIDs()), bbAfterCrash)
conf.Storage = cloneStorage
conf.WAL = cloneWAL

Expand Down
Loading
Loading