Skip to content

Commit

Permalink
add on chain details
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Jan 7, 2025
1 parent 76c97e5 commit 7a6b62c
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 84 deletions.
20 changes: 12 additions & 8 deletions web/api/webrpc/pipeline_porep.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ type PipelineTask struct {
AfterSDR bool `db:"after_sdr"`
StartedSDR bool `db:"started_sdr"`

TaskTreeD *int64 `db:"task_id_tree_d"`
AfterTreeD bool `db:"after_tree_d"`
StartedTreeD bool `db:"started_tree_d"`
TaskTreeD *int64 `db:"task_id_tree_d"`
AfterTreeD bool `db:"after_tree_d"`
StartedTreeD bool `db:"started_tree_d"`
TreeD *string `db:"tree_d_cid"`

TaskTreeC *int64 `db:"task_id_tree_c"`
AfterTreeC bool `db:"after_tree_c"`
StartedTreeRC bool `db:"started_tree_rc"`

TaskTreeR *int64 `db:"task_id_tree_r"`
AfterTreeR bool `db:"after_tree_r"`
TaskTreeR *int64 `db:"task_id_tree_r"`
AfterTreeR bool `db:"after_tree_r"`
TreeR *string `db:"tree_r_cid"`

TaskSynthetic *int64 `db:"task_id_synth"`
AfterSynthetic bool `db:"after_synth"`
Expand All @@ -45,8 +47,9 @@ type PipelineTask struct {
AfterPrecommitMsg bool `db:"after_precommit_msg"`
StartedPrecommitMsg bool `db:"started_precommit_msg"`

AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"`
SeedEpoch *int64 `db:"seed_epoch"`
AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"`
PreCommitMsgCid *string `db:"precommit_msg_cid"`
SeedEpoch *int64 `db:"seed_epoch"`

TaskPoRep *int64 `db:"task_id_porep"`
PoRepProof []byte `db:"porep_proof"`
Expand All @@ -65,7 +68,8 @@ type PipelineTask struct {
AfterCommitMsg bool `db:"after_commit_msg"`
StartedCommitMsg bool `db:"started_commit_msg"`

AfterCommitMsgSuccess bool `db:"after_commit_msg_success"`
AfterCommitMsgSuccess bool `db:"after_commit_msg_success"`
CommitMsgCid *string `db:"commit_msg_cid"`

Failed bool `db:"failed"`
FailedReason string `db:"failed_reason"`
Expand Down
218 changes: 191 additions & 27 deletions web/api/webrpc/sector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/docker/go-units"
"github.com/filecoin-project/go-state-types/big"
"github.com/samber/lo"
"github.com/snadrus/must"
"golang.org/x/xerrors"
Expand All @@ -19,9 +21,27 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

const verifiedPowerGainMul = 9

type SectorInfo struct {
SectorNumber int64
SpID uint64
SectorNumber int64
SpID uint64
Miner string
PreCommitMsg string
CommitMsg string
ActivationEpoch abi.ChainEpoch
ExpirationEpoch *int64
DealWeight string
Deadline *int64
Partition *int64
UnsealedCid string
SealedCid string
UpdatedUnsealedCid string
UpdatedSealedCid string
IsSnap bool
UpdateMsg string
UnsealedState bool

PipelinePoRep *sectorListEntry
PipelineSnap *sectorSnapListEntry

Expand Down Expand Up @@ -49,14 +69,15 @@ type SnapPipelineTask struct {
UpdateUnsealedCID *string `db:"update_unsealed_cid"`
UpdateSealedCID *string `db:"update_sealed_cid"`

TaskEncode *int64 `db:"task_id_encode"`
AfterEncode bool `db:"after_encode"`
TaskProve *int64 `db:"task_id_prove"`
AfterProve bool `db:"after_prove"`
TaskSubmit *int64 `db:"task_id_submit"`
AfterSubmit bool `db:"after_submit"`
AfterProveMsgSuccess bool `db:"after_prove_msg_success"`
ProveMsgTsk []byte `db:"prove_msg_tsk"`
TaskEncode *int64 `db:"task_id_encode"`
AfterEncode bool `db:"after_encode"`
TaskProve *int64 `db:"task_id_prove"`
AfterProve bool `db:"after_prove"`
TaskSubmit *int64 `db:"task_id_submit"`
AfterSubmit bool `db:"after_submit"`
AfterProveMsgSuccess bool `db:"after_prove_msg_success"`
ProveMsgTsk []byte `db:"prove_msg_tsk"`
UpdateMsgCid *string `db:"prove_msg_cid"`

TaskMoveStorage *int64 `db:"task_id_move_storage"`
AfterMoveStorage bool `db:"after_move_storage"`
Expand Down Expand Up @@ -135,6 +156,26 @@ type LocationTable struct {
Locations []FileLocations
}

type SectorMeta struct {
OrigUnsealedCid string `db:"orig_unsealed_cid"`
OrigSealedCid string `db:"orig_unsealed_cid"`

UpdatedUnsealedCid string `db:"cur_unsealed_cid"`
UpdatedSealedCid string `db:"cur_sealed_cid"`

PreCommitCid string `db:"msg_cid_precommit"`
CommitCid string `db:"msg_cid_commit"`
UpdateCid string `db:"msg_cid_update"`

IsCC bool `db:"is_cc"`
ExpirationEpoch *int64 `db:"expiration_epoch"`

Deadline *int64 `db:"deadline"`
Partition *int64 `db:"partition"`

UnsealedState bool `db:"target_unseal_state"`
}

func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*SectorInfo, error) {

maddr, err := address.NewFromString(sp)
Expand All @@ -147,24 +188,30 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto
return nil, xerrors.Errorf("invalid sp")
}

si := &SectorInfo{
SpID: spid,
Miner: maddr.String(),
SectorNumber: intid,
}

var tasks []PipelineTask

// Fetch PoRep pipeline data
err = a.deps.DB.Select(ctx, &tasks, `SELECT
sp_id, sector_number,
create_time,
task_id_sdr, after_sdr,
task_id_tree_d, after_tree_d,
task_id_tree_d, after_tree_d, tree_d_cid,
task_id_tree_c, after_tree_c,
task_id_tree_r, after_tree_r,
task_id_tree_r, after_tree_r, tree_r_cid,
task_id_synth, after_synth,
task_id_precommit_msg, after_precommit_msg,
after_precommit_msg_success, seed_epoch,
after_precommit_msg_success, precommit_msg_cid, seed_epoch,
task_id_porep, porep_proof, after_porep,
task_id_finalize, after_finalize,
task_id_move_storage, after_move_storage,
task_id_commit_msg, after_commit_msg,
after_commit_msg_success,
after_commit_msg_success, commit_msg_cid,
failed, failed_reason
FROM sectors_sdr_pipeline WHERE sp_id = $1 AND sector_number = $2`, spid, intid)
if err != nil {
Expand All @@ -185,7 +232,7 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto
task_id_prove, after_prove,
task_id_submit, after_submit,
after_prove_msg_success, prove_msg_tsk,
task_id_move_storage, after_move_storage,
task_id_move_storage, after_move_storage, prove_msg_cid,
failed, failed_at, failed_reason, failed_reason_msg,
submit_after
FROM sectors_snap_pipeline WHERE sp_id = $1 AND sector_number = $2`, spid, intid)
Expand All @@ -207,6 +254,33 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto
var sle *sectorListEntry
if len(tasks) > 0 {
task := tasks[0]
if task.PreCommitMsgCid != nil {
si.PreCommitMsg = *task.PreCommitMsgCid
} else {
si.PreCommitMsg = ""
}

if task.CommitMsgCid != nil {
si.CommitMsg = *task.CommitMsgCid
} else {
si.CommitMsg = ""
}

if task.TreeD != nil {
si.UnsealedCid = *task.TreeD
si.UpdatedUnsealedCid = *task.TreeD
} else {
si.UnsealedCid = ""
si.UpdatedUnsealedCid = ""
}

if task.TreeR != nil {
si.SealedCid = *task.TreeR
si.UpdatedSealedCid = *task.TreeR
} else {
si.SealedCid = ""
si.UpdatedSealedCid = ""
}
sle = &sectorListEntry{
PipelineTask: tasks[0],
AfterSeed: task.SeedEpoch != nil && *task.SeedEpoch <= int64(epoch),
Expand All @@ -223,6 +297,22 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto
var sleSnap *sectorSnapListEntry
if len(snapTasks) > 0 {
task := snapTasks[0]
if task.UpdateUnsealedCID != nil {
si.UpdatedUnsealedCid = *task.UpdateUnsealedCID
} else {
si.UpdatedUnsealedCid = ""
}
if task.UpdateUnsealedCID != nil {
si.UpdatedSealedCid = *task.UpdateUnsealedCID
} else {
si.UpdatedSealedCid = ""
}
if task.UpdateMsgCid != nil {
si.UpdateMsg = *task.UpdateMsgCid
} else {
si.UpdateMsg = ""
}
si.IsSnap = true
sleSnap = &sectorSnapListEntry{
SnapPipelineTask: task,
}
Expand Down Expand Up @@ -307,6 +397,43 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto

}

var sectorMetas []SectorMeta

// Fetch SectorMeta from DB
err = a.deps.DB.Select(ctx, &sectorMetas, `SELECT orig_sealed_cid,
orig_unsealed_cid, cur_sealed_cid, cur_unsealed_cid,
msg_cid_precommit, msg_cid_commit, msg_cid_update,
expiration_epoch, deadline, partition, target_unseal_state,
is_cc FROM sectors_sdr_meta
WHERE sp_id = $1 AND sector_number = $2`, spid, intid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch sector metadata: %w", err)
}

if len(sectorMetas) > 0 {
sectormeta := sectorMetas[0]
si.UnsealedCid = sectormeta.OrigUnsealedCid
si.SealedCid = sectormeta.OrigSealedCid
si.UpdatedUnsealedCid = sectormeta.UpdatedUnsealedCid
si.UpdatedSealedCid = sectormeta.UpdatedSealedCid
si.PreCommitMsg = sectormeta.PreCommitCid
si.CommitMsg = sectormeta.CommitCid
si.UpdateMsg = sectormeta.UpdateCid
si.IsSnap = !sectormeta.IsCC
if sectormeta.ExpirationEpoch != nil {
si.ExpirationEpoch = sectormeta.ExpirationEpoch
}
if sectormeta.Deadline != nil {
d := *sectormeta.Deadline
si.Deadline = &d
}
if sectormeta.Partition != nil {
p := *sectormeta.Partition
si.Partition = &p
}
si.UnsealedState = sectormeta.UnsealedState
}

var pieces []SectorPieceMeta

// Fetch PoRep pieces
Expand Down Expand Up @@ -509,20 +636,57 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto
}
}

return &SectorInfo{
SectorNumber: intid,
SpID: spid,
PipelinePoRep: sle,
PipelineSnap: sleSnap,
onChainInfo, err := a.deps.Chain.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(intid), types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed to get on chain info for the sector: %w", err)
}

dw, vp := .0, .0
dealWeight := "CC"
{
rdw := big.Add(onChainInfo.DealWeight, onChainInfo.VerifiedDealWeight)
dw = float64(big.Div(rdw, big.NewInt(int64(onChainInfo.Expiration-onChainInfo.PowerBaseEpoch))).Uint64())
vp = float64(big.Div(big.Mul(onChainInfo.VerifiedDealWeight, big.NewInt(verifiedPowerGainMul)), big.NewInt(int64(onChainInfo.Expiration-onChainInfo.PowerBaseEpoch))).Uint64())
if vp > 0 {
dw = vp
}
if dw > 0 {
dealWeight = units.BytesSize(dw)
}
}

if si.Deadline == nil || si.Partition == nil {
part, err := a.deps.Chain.StateSectorPartition(ctx, maddr, abi.SectorNumber(intid), types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed to get partition info for the sector: %w", err)
}

d := int64(part.Deadline)
si.Deadline = &d

p := int64(part.Partition)
si.Partition = &p
}

si.ActivationEpoch = onChainInfo.Activation
if si.ExpirationEpoch == nil || *si.ExpirationEpoch != int64(onChainInfo.Expiration) {
expr := int64(onChainInfo.Expiration)
si.ExpirationEpoch = &expr
}
si.DealWeight = dealWeight

si.PipelinePoRep = sle
si.PipelineSnap = sleSnap

si.Pieces = pieces
si.Locations = locs
si.Tasks = htasks
si.TaskHistory = th

Pieces: pieces,
Locations: locs,
Tasks: htasks,
TaskHistory: th,
si.Resumable = hasAnyStuckTask
si.Restart = hasAnyStuckTask && (sle == nil || !sle.AfterSynthetic)

Resumable: hasAnyStuckTask,
Restart: hasAnyStuckTask && (sle == nil || !sle.AfterSynthetic),
}, nil
return si, nil
}

func (a *WebRPC) SectorResume(ctx context.Context, spid, id int64) error {
Expand Down
Loading

0 comments on commit 7a6b62c

Please sign in to comment.