Skip to content

Commit

Permalink
Integrate end to end hashed chain exchange with F3
Browse files Browse the repository at this point in the history
Integrate the chain exchange mechanism with F3 host and runner. But
without touching the core GPBFT.

The implementation here leaves two major TODOs: 1) chain broadcasting
mechanism (currently coupled to GPBFT message broadcast), and 2)
partial message validation prior to buffering (currently skipped
entirely but with capped buffer sizes and re-validation by core GPBFT
once the messages are complete).

The integration introduces the concept of Partial GMessage: a GMessage
with chains replaced with the key to the chain. The work introduces a
buffer and refill mechanism that listens to the chains discovered,
un-buffers the messages having re-constructed their original GMessage
and feeds them to the participation using the existing event loop.

Part of #792
  • Loading branch information
masih committed Jan 10, 2025
1 parent 3507ce2 commit de7a49c
Show file tree
Hide file tree
Showing 7 changed files with 551 additions and 29 deletions.
122 changes: 122 additions & 0 deletions cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions chainexchange/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (p *PubSubChainExchange) Start(ctx context.Context) error {
}
if p.topicScoreParams != nil {
if err := p.topic.SetScoreParams(p.topicScoreParams); err != nil {
return fmt.Errorf("failed to set score params: %w", err)
// This can happen most likely due to router not supporting peer scoring. It's
// non-critical. Hence, the warning log.
log.Warnw("failed to set topic score params", "err", err)
}
}
subscription, err := p.topic.Subscribe(pubsub.WithBufferSize(p.subscriptionBufferSize))
Expand All @@ -79,7 +81,7 @@ func (p *PubSubChainExchange) Start(ctx context.Context) error {
for ctx.Err() == nil {
msg, err := subscription.Next(ctx)
if err != nil {
log.Debugw("failed to read nex message from subscription", "err", err)
log.Debugw("failed to read next message from subscription", "err", err)
continue
}
cmsg := msg.ValidatorData.(Message)
Expand All @@ -89,7 +91,9 @@ func (p *PubSubChainExchange) Start(ctx context.Context) error {
p.stop = func() error {
cancel()
subscription.Cancel()
return p.topic.Close()
_ = p.pubsub.UnregisterTopicValidator(p.topicName)
_ = p.topic.Close()
return nil
}
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"

"github.com/filecoin-project/go-f3"
"github.com/filecoin-project/go-f3/certexchange"
"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/chainexchange"
Expand Down Expand Up @@ -45,6 +46,11 @@ func main() {
chainexchange.Message{},
)
})
eg.Go(func() error {
return gen.WriteTupleEncodersToFile("../cbor_gen.go", "f3",
f3.PartialGMessage{},
)
})
if err := eg.Wait(); err != nil {
fmt.Printf("Failed to complete cborg_gen: %v\n", err)
os.Exit(1)
Expand Down
109 changes: 95 additions & 14 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type gpbftRunner struct {

inputs gpbftInputs
msgEncoding gMessageEncoding
pmm *partialMessageManager
}

type roundPhase struct {
Expand Down Expand Up @@ -141,6 +142,12 @@ func newRunner(
} else {
runner.msgEncoding = &cborGMessageEncoding{}
}

runner.pmm, err = newPartialMessageManager(runner.Progress, ps, m)
if err != nil {
return nil, fmt.Errorf("creating partial message manager: %w", err)
}

return runner, nil
}

Expand All @@ -156,6 +163,11 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
return err
}

completedMessageQueue, err := h.pmm.Start(ctx)
if err != nil {
return err
}

finalityCertificates, unsubCerts := h.certStore.Subscribe()
select {
case c := <-finalityCertificates:
Expand Down Expand Up @@ -193,7 +205,7 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
default:
}

// Handle messages, finality certificates, and alarms
// Handle messages, completed messages, finality certificates, and alarms
select {
case c := <-finalityCertificates:
if err := h.receiveCertificate(c); err != nil {
Expand All @@ -219,6 +231,29 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
// errors.
log.Errorf("error when processing message: %+v", err)
}
case gmsg, ok := <-completedMessageQueue:
if !ok {
return fmt.Errorf("incoming completed message queue closed")
}
switch validatedMessage, err := h.participant.ValidateMessage(gmsg); {
case errors.Is(err, gpbft.ErrValidationInvalid):
log.Debugw("validation error while validating completed message", "err", err)
// TODO: Signal partial message manager to penalise sender,
// e.g. reduce the total number of messages stroed from sender?
case errors.Is(err, gpbft.ErrValidationTooOld):
// TODO: Signal partial message manager to drop the instance?
case errors.Is(err, gpbft.ErrValidationNotRelevant):
// TODO: Signal partial message manager to drop irrelevant messages?
case errors.Is(err, gpbft.ErrValidationNoCommittee):
log.Debugw("committee error while validating completed message", "err", err)
case err != nil:
log.Errorw("unknown error while validating completed message", "err", err)
default:
recordValidatedMessage(ctx, validatedMessage)
if err := h.participant.ReceiveMessage(validatedMessage); err != nil {
log.Errorw("error while processing completed message", "err", err)
}
}
case <-h.runningCtx.Done():
return nil
}
Expand Down Expand Up @@ -452,7 +487,17 @@ func (h *gpbftRunner) BroadcastMessage(ctx context.Context, msg *gpbft.GMessage)
if h.topic == nil {
return pubsub.ErrTopicClosed
}
encoded, err := h.msgEncoding.Encode(msg)

if err := h.pmm.BroadcastChain(ctx, msg.Vote.Instance, msg.Vote.Value); err != nil {
// Silently log the error and continue. Partial message manager should take care of re-broadcast.
log.Warnw("failed to broadcast chain", "instance", msg.Vote.Instance, "error", err)
}

pmsg, err := h.pmm.toPartialGMessage(msg)
if err != nil {
return err
}
encoded, err := h.msgEncoding.Encode(pmsg)
if err != nil {
return fmt.Errorf("encoding GMessage for broadcast: %w", err)
}
Expand All @@ -472,7 +517,17 @@ func (h *gpbftRunner) rebroadcastMessage(msg *gpbft.GMessage) error {
if h.topic == nil {
return pubsub.ErrTopicClosed
}
encoded, err := h.msgEncoding.Encode(msg)

if err := h.pmm.BroadcastChain(h.runningCtx, msg.Vote.Instance, msg.Vote.Value); err != nil {
// Silently log the error and continue. Partial message manager should take care of re-broadcast.
log.Warnw("failed to rebroadcast chain", "instance", msg.Vote.Instance, "error", err)
}

pmsg, err := h.pmm.toPartialGMessage(msg)
if err != nil {
return err
}
encoded, err := h.msgEncoding.Encode(pmsg)
if err != nil {
return fmt.Errorf("encoding GMessage for broadcast: %w", err)
}
Expand All @@ -489,12 +544,28 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
recordValidationTime(ctx, start, _result)
}(time.Now())

gmsg, err := h.msgEncoding.Decode(msg.Data)
pgmsg, err := h.msgEncoding.Decode(msg.Data)
if err != nil {
log.Debugw("failed to decode message", "from", msg.GetFrom(), "err", err)
return pubsub.ValidationReject
}

gmsg, completed := h.pmm.CompleteMessage(ctx, pgmsg)
if !completed {
// TODO: Partially validate the message because we can. To do this, however,
// message validator needs to be refactored to tolerate partial data.
// Hence, for now validation is postponed entirely until that refactor
// is done to accommodate partial messages.
// See: https://github.com/filecoin-project/go-f3/issues/813

// FIXME: must verify signature before buffering otherwise nodes can spoof the
// buffer with invalid messages on behalf of other peers as censorship
// attack.

msg.ValidatorData = pgmsg
return pubsub.ValidationAccept
}

switch validatedMessage, err := h.participant.ValidateMessage(gmsg); {
case errors.Is(err, gpbft.ErrValidationInvalid):
log.Debugf("validation error during validation: %+v", err)
Expand Down Expand Up @@ -588,15 +659,18 @@ func (h *gpbftRunner) startPubsub() (<-chan gpbft.ValidatedMessage, error) {
}
return fmt.Errorf("pubsub message subscription returned an error: %w", err)
}
gmsg, ok := msg.ValidatorData.(gpbft.ValidatedMessage)
if !ok {

switch gmsg := msg.ValidatorData.(type) {
case gpbft.ValidatedMessage:
select {
case messageQueue <- gmsg:
case <-h.runningCtx.Done():
return nil
}
case *PartialGMessage:
h.pmm.bufferPartialMessage(h.runningCtx, gmsg)
default:
log.Errorf("invalid msgValidatorData: %+v", msg.ValidatorData)
continue
}
select {
case messageQueue <- gmsg:
case <-h.runningCtx.Done():
return nil
}
}
return nil
Expand Down Expand Up @@ -632,18 +706,25 @@ func (h *gpbftHost) RequestRebroadcast(instant gpbft.Instant) error {
}

func (h *gpbftHost) GetProposal(instance uint64) (*gpbft.SupplementalData, gpbft.ECChain, error) {
return h.inputs.GetProposal(h.runningCtx, instance)
proposal, chain, err := h.inputs.GetProposal(h.runningCtx, instance)
if err == nil {
if err := h.pmm.BroadcastChain(h.runningCtx, instance, chain); err != nil {
log.Warnw("failed to broadcast chain", "instance", instance, "error", err)
}
}
return proposal, chain, err
}

func (h *gpbftHost) GetCommittee(instance uint64) (*gpbft.Committee, error) {
return h.inputs.GetCommittee(h.runningCtx, instance)
}

func (h *gpbftRunner) Stop(context.Context) error {
func (h *gpbftRunner) Stop(ctx context.Context) error {
h.ctxCancel()
return multierr.Combine(
h.wal.Close(),
h.errgrp.Wait(),
h.pmm.Shutdown(ctx),
h.teardownPubsub(),
)
}
Expand Down
Loading

0 comments on commit de7a49c

Please sign in to comment.