Skip to content

Commit

Permalink
Add finalize API to EC backend (#642)
Browse files Browse the repository at this point in the history
Expand the F3 EC backend API to mark a given tipset as final. This
translates to checkpointing the tipset in Lotus beyond which no forks
are allowed.

Part of #603
  • Loading branch information
masih authored Sep 18, 2024
1 parent f49c874 commit cc35ea0
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
11 changes: 10 additions & 1 deletion ec/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@ type Backend interface {
GetTipsetByEpoch(ctx context.Context, epoch int64) (TipSet, error)
// GetTipset returns the tipset with the given key.
GetTipset(context.Context, gpbft.TipSetKey) (TipSet, error)
// GetHead returns the current head tipset of the chain
// GetHead returns the current head tipset of the chain, which must be a
// descendant of the latest finalized tipset.
//
// See Finalize.
GetHead(context.Context) (TipSet, error)
// GetParent returns the parent of the current tipset.
GetParent(context.Context, TipSet) (TipSet, error)
// GetPowerTable returns the power table at the tipset given as an argument.
GetPowerTable(context.Context, gpbft.TipSetKey) (gpbft.PowerEntries, error)
// Finalize marks the tipset that corresponds to the given key as finalised
// beyond which no forks are allowed to occur. The finalised tipset overrides the
// head tipset if it is not an ancestor of the current head.
//
// See GetHead.
Finalize(context.Context, gpbft.TipSetKey) error
}

type TipSet interface {
Expand Down
49 changes: 43 additions & 6 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ import (
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
"go.opentelemetry.io/otel/metric"

pubsub "github.com/libp2p/go-libp2p-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -109,7 +108,6 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
}

finalityCertificates, unsubCerts := h.certStore.Subscribe()

h.errgrp.Go(func() (_err error) {
defer func() {
unsubCerts()
Expand Down Expand Up @@ -164,6 +162,45 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
}
return nil
})

// Asynchronously checkpoint the decided tipset keys by explicitly making a
// separate subscription to the cert store. This may cause a sync in a case where
// the finalized tipset is not already stored by the chain store, which is a
// blocking operation. Hence, the asynchronous checkpointing.
//
// Note, there is no guarantee that every finalized tipset will be checkpointed.
// Because:
// 1. the subscription only returns the latest certificate, i.e. may
// miss intermediate certificates, and
// 2. errors that may occur during checkpointing are silently logged
// to allow checkpointing of future finality certificates.
//
// Triggering the checkpointing here means that certstore remains the sole source
// of truth in terms of tipsets that have been finalised.
finalize, unsubFinalize := h.certStore.Subscribe()
h.errgrp.Go(func() error {
defer unsubFinalize()
for h.runningCtx.Err() == nil {
select {
case <-h.runningCtx.Done():
return nil
case cert, ok := <-finalize:
if !ok {
// This should never happen according to certstore subscribe semantic. If it
// does, error loudly since the chances are the cause is a programmer error.
return errors.New("cert store subscription to finalize tipsets was closed unexpectedly")
}
key := cert.ECChain.Head().Key
if err := h.ec.Finalize(h.runningCtx, key); err != nil {
// There is not much we can do here other than logging. The next instance start
// will effectively retry checkpointing the latest finalized tipset. This error
// will not impact the selection of next instance chain.
log.Error(fmt.Errorf("error while finalizing decision at EC: %w", err))
}
}
}
return nil
})
return nil
}

Expand Down Expand Up @@ -408,7 +445,7 @@ func (h *gpbftHost) collectChain(base ec.TipSet, head ec.TipSet) ([]ec.TipSet, e
return res[1:], nil
}

func (h *gpbftRunner) Stop(_ctx context.Context) error {
func (h *gpbftRunner) Stop(context.Context) error {
h.ctxCancel()
return multierr.Combine(
h.errgrp.Wait(),
Expand Down Expand Up @@ -604,7 +641,7 @@ func (h *gpbftHost) SetAlarm(at time.Time) {
// we cannot reuse the timer because we don't know if it was read or not
h.alertTimer.Stop()
if at.IsZero() {
// It "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
// If "at" is zero, we cancel the timer entirely. Unfortunately, we still have to
// replace it for the reason stated above.
h.alertTimer = h.clock.Timer(0)
if !h.alertTimer.Stop() {
Expand Down
2 changes: 2 additions & 0 deletions internal/consensus/fake_ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,5 @@ func (ec *FakeEC) GetTipset(_ context.Context, tsk gpbft.TipSetKey) (ec.TipSet,
epoch := binary.BigEndian.Uint64(tsk[6+32-8 : 6+32])
return ec.genTipset(int64(epoch)), nil
}

func (ec *FakeEC) Finalize(context.Context, gpbft.TipSetKey) error { return nil }

0 comments on commit cc35ea0

Please sign in to comment.