diff --git a/lib/ffi/piece_funcs.go b/lib/ffi/piece_funcs.go index f09a3c26b..4723756b2 100644 --- a/lib/ffi/piece_funcs.go +++ b/lib/ffi/piece_funcs.go @@ -15,7 +15,7 @@ import ( func (sb *SealCalls) WritePiece(ctx context.Context, taskID *harmonytask.TaskID, pieceID storiface.PieceNumber, size int64, data io.Reader) error { // todo: config(?): allow setting PathStorage for this // todo storage reservations - paths, _, done, err := sb.sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing) + paths, _, done, err := sb.Sectors.AcquireSector(ctx, taskID, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing) if err != nil { return err } @@ -68,9 +68,9 @@ func (sb *SealCalls) WritePiece(ctx context.Context, taskID *harmonytask.TaskID, } func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) { - return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece) + return sb.Sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece) } func (sb *SealCalls) RemovePiece(ctx context.Context, id storiface.PieceNumber) error { - return sb.sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil) + return sb.Sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil) } diff --git a/lib/ffi/scrub_funcs.go b/lib/ffi/scrub_funcs.go index e18d92c60..2091d641e 100644 --- a/lib/ffi/scrub_funcs.go +++ b/lib/ffi/scrub_funcs.go @@ -18,7 +18,7 @@ import ( ) func (sb *SealCalls) CheckUnsealedCID(ctx context.Context, s storiface.SectorRef) (cid.Cid, error) { - reader, err := sb.sectors.storage.ReaderSeq(ctx, s, storiface.FTUnsealed) + reader, err := sb.Sectors.storage.ReaderSeq(ctx, s, storiface.FTUnsealed) if err != nil { return cid.Undef, xerrors.Errorf("getting unsealed sector reader: %w", err) } diff --git a/lib/ffi/sdr_funcs.go b/lib/ffi/sdr_funcs.go index 3a8330d13..4aa2599f1 100644 --- a/lib/ffi/sdr_funcs.go +++ b/lib/ffi/sdr_funcs.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/json" "fmt" + "github.com/samber/lo" "io" "os" "path/filepath" @@ -43,7 +44,7 @@ type ExternPrecommit2 func(ctx context.Context, sector storiface.SectorRef, cach } */ type SealCalls struct { - sectors *storageProvider + Sectors *storageProvider /*// externCalls cointain overrides for calling alternative sealing logic externCalls ExternalSealer*/ @@ -51,11 +52,11 @@ type SealCalls struct { func NewSealCalls(st *paths.Remote, ls *paths.Local, si paths.SectorIndex) *SealCalls { return &SealCalls{ - sectors: &storageProvider{ + Sectors: &storageProvider{ storage: st, localStore: ls, sindex: si, - storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, *StorageReservation](), + storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, []*StorageReservation](), }, } } @@ -64,7 +65,7 @@ type storageProvider struct { storage *paths.Remote localStore *paths.Local sindex paths.SectorIndex - storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation] + storageReservations *xsync.MapOf[harmonytask.TaskID, []*StorageReservation] } func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (fspaths, ids storiface.SectorPaths, release func(dontDeclare ...storiface.SectorFileType), err error) { @@ -74,7 +75,12 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask var ok bool var resv *StorageReservation if taskID != nil { - resv, ok = l.storageReservations.Load(*taskID) + resvs, ok := l.storageReservations.Load(*taskID) + if ok { + resv, ok = lo.Find(resvs, func(res *StorageReservation) bool { + return res.SectorRef.ID() == sector.ID + }) + } } if ok && resv != nil { if resv.Alloc != allocate || resv.Existing != existing { @@ -144,7 +150,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask } func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, into storiface.SectorFileType, sector storiface.SectorRef, ticket abi.SealRandomness, commDcid cid.Cid) error { - paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, into, storiface.PathSealing) + paths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, into, storiface.PathSealing) if err != nil { return xerrors.Errorf("acquiring sector paths: %w", err) } @@ -223,7 +229,7 @@ func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathID log.Debugw("ensureOneCopy", "sector", sid, "type", fileType, "keep", keepIn) - if err := sb.sectors.storage.Remove(ctx, sid, fileType, true, keepIn); err != nil { + if err := sb.Sectors.storage.Remove(ctx, sid, fileType, true, keepIn); err != nil { return err } } @@ -237,7 +243,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err) } - fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) + fspaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) } @@ -352,7 +358,7 @@ func (sb *SealCalls) GenerateSynthPoRep() { } func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) { - vproof, err := sb.sectors.storage.GeneratePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed) + vproof, err := sb.Sectors.storage.GeneratePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed) if err != nil { return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err) } @@ -498,7 +504,7 @@ func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof) } func (sb *SealCalls) LocalStorage(ctx context.Context) ([]storiface.StoragePath, error) { - return sb.sectors.localStore.Local(ctx) + return sb.Sectors.localStore.Local(ctx) } func changePathType(path string, newType storiface.SectorFileType) (string, error) { @@ -526,7 +532,7 @@ func changePathType(path string, newType storiface.SectorFileType) (string, erro return newPath, nil } func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed bool) error { - sectorPaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) + sectorPaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) if err != nil { return xerrors.Errorf("acquiring sector paths: %w", err) } @@ -548,7 +554,7 @@ func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.Sector defer func() { // We don't pass FTUnsealed to Acquire, so releaseSector won't declare it. Do it here. - if err := sb.sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil { + if err := sb.Sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil { log.Errorf("declare unsealed sector error: %+v", err) } }() @@ -666,11 +672,16 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef var opts []storiface.AcquireOption if taskID != nil { - resv, ok := sb.sectors.storageReservations.Load(*taskID) + resvs, ok := sb.Sectors.storageReservations.Load(*taskID) // if the reservation is missing MoveStorage will simply create one internally. This is fine as the reservation // will only be missing when the node is restarting, which means that the missing reservations will get recreated // anyways, and before we start claiming other tasks. if ok { + if len(resvs) != 1 { + return xerrors.Errorf("task %d has %d reservations, expected 1", taskID, len(resvs)) + } + resv := resvs[0] + defer resv.Release() if resv.Alloc != storiface.FTNone { @@ -684,13 +695,13 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef } } - err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...) + err := sb.Sectors.storage.MoveStorage(ctx, sector, toMove, opts...) if err != nil { return xerrors.Errorf("moving storage: %w", err) } for _, fileType := range toMove.AllSet() { - if err := sb.sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil { + if err := sb.Sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil { return xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err) } } @@ -699,7 +710,7 @@ func (sb *SealCalls) MoveStorage(ctx context.Context, sector storiface.SectorRef } func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.SectorRef, ft storiface.SectorFileType) (sectorFound bool, ptype storiface.PathType, err error) { - stores, err := sb.sectors.sindex.StorageFindSector(ctx, sector.ID, ft, 0, false) + stores, err := sb.Sectors.sindex.StorageFindSector(ctx, sector.ID, ft, 0, false) if err != nil { return false, "", xerrors.Errorf("finding sector: %w", err) } @@ -718,7 +729,7 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec // PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) (fsPath, pathID storiface.SectorPaths, releaseSector func(...storiface.SectorFileType), err error) { - fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) + fsPath, pathID, releaseSector, err = sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector paths: %w", err) } @@ -754,7 +765,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unse } func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, sealed cid.Cid, unsealed cid.Cid, randomness abi.SealRandomness, pieces []abi.PieceInfo) error { - fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing) + fspaths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing) if err != nil { return xerrors.Errorf("acquiring sector paths: %w", err) } diff --git a/lib/ffi/snap_funcs.go b/lib/ffi/snap_funcs.go index 7cbfc707d..acdaedd49 100644 --- a/lib/ffi/snap_funcs.go +++ b/lib/ffi/snap_funcs.go @@ -41,7 +41,7 @@ func (sb *SealCalls) EncodeUpdate( noDecl = storiface.FTUnsealed } - paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTUnsealed, storiface.PathSealing) + paths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTUnsealed, storiface.PathSealing) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) } @@ -133,7 +133,7 @@ func (sb *SealCalls) EncodeUpdate( log.Debugw("get key data", "keyPath", keyPath, "keyCachePath", keyCachePath, "sectorID", sector.ID, "taskID", taskID) - r, err := sb.sectors.storage.ReaderSeq(ctx, sector, storiface.FTSealed) + r, err := sb.Sectors.storage.ReaderSeq(ctx, sector, storiface.FTSealed) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("getting sealed sector reader: %w", err) } @@ -177,7 +177,7 @@ func (sb *SealCalls) EncodeUpdate( // fetch cache var buf bytes.Buffer // usually 73.2 MiB - err = sb.sectors.storage.ReadMinCacheInto(ctx, sector, storiface.FTCache, &buf) + err = sb.Sectors.storage.ReadMinCacheInto(ctx, sector, storiface.FTCache, &buf) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("reading cache: %w", err) } @@ -269,7 +269,7 @@ func (sb *SealCalls) EncodeUpdate( } func (sb *SealCalls) ProveUpdate(ctx context.Context, proofType abi.RegisteredUpdateProof, sector storiface.SectorRef, key, sealed, unsealed cid.Cid) ([]byte, error) { - jsonb, err := sb.sectors.storage.ReadSnapVanillaProof(ctx, sector) + jsonb, err := sb.Sectors.storage.ReadSnapVanillaProof(ctx, sector) if err != nil { return nil, xerrors.Errorf("read snap vanilla proof: %w", err) } @@ -301,11 +301,16 @@ func (sb *SealCalls) MoveStorageSnap(ctx context.Context, sector storiface.Secto var opts []storiface.AcquireOption if taskID != nil { - resv, ok := sb.sectors.storageReservations.Load(*taskID) + resvs, ok := sb.Sectors.storageReservations.Load(*taskID) // if the reservation is missing MoveStorage will simply create one internally. This is fine as the reservation // will only be missing when the node is restarting, which means that the missing reservations will get recreated // anyways, and before we start claiming other tasks. if ok { + if len(resvs) != 1 { + return xerrors.Errorf("task %d has %d reservations, expected 1", taskID, len(resvs)) + } + resv := resvs[0] + defer resv.Release() if resv.Alloc != storiface.FTNone { @@ -319,13 +324,13 @@ func (sb *SealCalls) MoveStorageSnap(ctx context.Context, sector storiface.Secto } } - err := sb.sectors.storage.MoveStorage(ctx, sector, toMove, opts...) + err := sb.Sectors.storage.MoveStorage(ctx, sector, toMove, opts...) if err != nil { return xerrors.Errorf("moving storage: %w", err) } for _, fileType := range toMove.AllSet() { - if err := sb.sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil { + if err := sb.Sectors.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil { return xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err) } } diff --git a/lib/ffi/task_storage.go b/lib/ffi/task_storage.go index 8924eea87..666ac7f66 100644 --- a/lib/ffi/task_storage.go +++ b/lib/ffi/task_storage.go @@ -42,7 +42,7 @@ type TaskStorage struct { ssize abi.SectorSize pathType storiface.PathType - taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error) + taskToSectorRef func(taskID harmonytask.TaskID) ([]SectorRef, error) // Minimum free storage percentage cutoff for reservation rejection MinFreeStoragePercentage float64 @@ -60,6 +60,17 @@ type StorageReservation struct { } func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, MinFreeStoragePercentage float64) *TaskStorage { + return sb.StorageMulti(func(taskID harmonytask.TaskID) ([]SectorRef, error) { + sr, err := taskToSectorRef(taskID) + if err != nil { + return nil, err + } + + return []SectorRef{sr}, nil + }, alloc, existing, ssize, pathType, MinFreeStoragePercentage) +} + +func (sb *SealCalls) StorageMulti(taskToSectorRef func(taskID harmonytask.TaskID) ([]SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, MinFreeStoragePercentage float64) *TaskStorage { return &TaskStorage{ sc: sb, alloc: alloc, @@ -74,13 +85,13 @@ func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (Se func (t *TaskStorage) HasCapacity() bool { ctx := context.Background() - paths, err := t.sc.sectors.sindex.StorageBestAlloc(ctx, t.alloc, t.ssize, t.pathType, storagePaths.NoMinerFilter) + paths, err := t.sc.Sectors.sindex.StorageBestAlloc(ctx, t.alloc, t.ssize, t.pathType, storagePaths.NoMinerFilter) if err != nil { log.Errorf("finding best alloc in HasCapacity: %+v", err) return false } - local, err := t.sc.sectors.localStore.Local(ctx) + local, err := t.sc.Sectors.localStore.Local(ctx) if err != nil { log.Errorf("getting local storage: %+v", err) return false @@ -123,7 +134,7 @@ func (t *TaskStorage) Claim(taskID int) (func() error, error) { ctx := context.Background() - sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID)) + sectorRefs, err := t.taskToSectorRef(harmonytask.TaskID(taskID)) if err != nil { return nil, xerrors.Errorf("getting sector ref: %w", err) } @@ -133,7 +144,7 @@ func (t *TaskStorage) Claim(taskID int) (func() error, error) { requestedTypes := t.alloc | t.existing - lockAcquireTimuout := time.Second * 10 + lockAcquireTimuout := time.Second * 60 lockAcquireTimer := time.NewTimer(lockAcquireTimuout) go func() { @@ -145,9 +156,11 @@ func (t *TaskStorage) Claim(taskID int) (func() error, error) { } }() - if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, requestedTypes); err != nil { - // timer will expire - return nil, xerrors.Errorf("claim StorageLock: %w", err) + for _, sectorRef := range sectorRefs { + if err := t.sc.Sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, requestedTypes); err != nil { + // timer will expire + return nil, xerrors.Errorf("claim StorageLock: %w", err) + } } if !lockAcquireTimer.Stop() { @@ -159,70 +172,87 @@ func (t *TaskStorage) Claim(taskID int) (func() error, error) { lockAcquireTimer.Reset(0) }() - // First see what we have locally. We are putting allocate and existing together because local acquire will look - // for existing files for allocate requests, separately existing files which aren't found locally will be need to - // be fetched, so we will need to create reservations for that too. - // NOTE localStore.AcquireSector does not open or create any files, nor does it reserve space. It only proposes - // paths to be used. - pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, t.pathType, storiface.AcquireMove) - if err != nil { - return nil, err - } + cleanup := func() {} - // reserve the space - release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.MinFreeStoragePercentage) - if err != nil { - return nil, err - } + defer func() { + cleanup() + }() - var releaseOnce sync.Once - releaseFunc := func() { - releaseOnce.Do(release) - } + var resvs []*StorageReservation + + for _, sectorRef := range sectorRefs { + // First see what we have locally. We are putting allocate and existing together because local acquire will look + // for existing files for allocate requests, separately existing files which aren't found locally will be need to + // be fetched, so we will need to create reservations for that too. + // NOTE localStore.AcquireSector does not open or create any files, nor does it reserve space. It only proposes + // paths to be used. + pathsFs, pathIDs, err := t.sc.Sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, t.pathType, storiface.AcquireMove) + if err != nil { + return nil, err + } + + // reserve the space + release, err := t.sc.Sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.MinFreeStoragePercentage) + if err != nil { + return nil, err + } + + prevCleanup := cleanup + cleanup = func() { + prevCleanup() + release() + } + + var releaseOnce sync.Once + releaseFunc := func() { + releaseOnce.Do(release) + } + + sres := &StorageReservation{ + SectorRef: sectorRef, + Release: releaseFunc, + Paths: pathsFs, + PathIDs: pathIDs, - sres := &StorageReservation{ - SectorRef: sectorRef, - Release: releaseFunc, - Paths: pathsFs, - PathIDs: pathIDs, + Alloc: t.alloc, + Existing: t.existing, + } - Alloc: t.alloc, - Existing: t.existing, + resvs = append(resvs, sres) + log.Debugw("claimed storage", "task_id", taskID, "sector", sectorRef.ID(), "paths", pathsFs) } - t.sc.sectors.storageReservations.Store(harmonytask.TaskID(taskID), sres) + cleanup = func() {} - log.Debugw("claimed storage", "task_id", taskID, "sector", sectorRef.ID(), "paths", pathsFs) + t.sc.Sectors.storageReservations.Store(harmonytask.TaskID(taskID), resvs) // note: we drop the sector writelock on return; THAT IS INTENTIONAL, this code runs in CanAccept, which doesn't // guarantee that the work for this sector will happen on this node; SDR CanAccept just ensures that the node can // run the job, harmonytask is what ensures that only one SDR runs at a time return func() error { - return t.markComplete(taskID, sectorRef) + return t.markComplete(taskID, sectorRefs) }, nil } -func (t *TaskStorage) markComplete(taskID int, sectorRef SectorRef) error { +func (t *TaskStorage) markComplete(taskID int, sectorRefs []SectorRef) error { // MarkComplete is ALWAYS called after the task is done or not scheduled // If Claim is called and returns without errors, MarkComplete with the same // taskID is guaranteed to eventually be called - sres, ok := t.sc.sectors.storageReservations.Load(harmonytask.TaskID(taskID)) + sres, ok := t.sc.Sectors.storageReservations.Load(harmonytask.TaskID(taskID)) if !ok { return xerrors.Errorf("no reservation found for task %d", taskID) } - if sectorRef != sres.SectorRef { - return xerrors.Errorf("reservation sector ref doesn't match task sector ref: %+v != %+v", sectorRef, sres.SectorRef) - } - - log.Debugw("marking storage complete", "task_id", taskID, "sector", sectorRef.ID(), "paths", sres.Paths) + log.Debugw("marking storage complete", "task_id", taskID, "sector", sectorRefs, "sres", sres) // remove the reservation - t.sc.sectors.storageReservations.Delete(harmonytask.TaskID(taskID)) + t.sc.Sectors.storageReservations.Delete(harmonytask.TaskID(taskID)) // release the reservation - sres.Release() + for _, res := range sres { + res.Release() + } // note: this only frees the reservation, allocated sectors are declared in AcquireSector which is aware of // the reservation diff --git a/lib/ffi/unseal_funcs.go b/lib/ffi/unseal_funcs.go index 8879b0f6b..778ce3664 100644 --- a/lib/ffi/unseal_funcs.go +++ b/lib/ffi/unseal_funcs.go @@ -18,18 +18,18 @@ func (sb *SealCalls) decodeCommon(ctx context.Context, taskID harmonytask.TaskID ctx, cancel := context.WithCancel(ctx) defer cancel() - paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUnsealed, storiface.PathStorage) + paths, pathIDs, releaseSector, err := sb.Sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTUnsealed, storiface.PathStorage) if err != nil { return xerrors.Errorf("acquiring sector paths: %w", err) } defer releaseSector() - sealReader, err := sb.sectors.storage.ReaderSeq(ctx, sector, fileType) + sealReader, err := sb.Sectors.storage.ReaderSeq(ctx, sector, fileType) if err != nil { return xerrors.Errorf("getting sealed sector reader: %w", err) } - keyReader, err := sb.sectors.storage.ReaderSeq(ctx, sector, storiface.FTKey) + keyReader, err := sb.Sectors.storage.ReaderSeq(ctx, sector, storiface.FTKey) if err != nil { return xerrors.Errorf("getting key reader: %w", err) } @@ -66,7 +66,7 @@ func (sb *SealCalls) decodeCommon(ctx context.Context, taskID harmonytask.TaskID return xerrors.Errorf("ensure one copy: %w", err) } - if err := sb.sectors.storage.Remove(ctx, sector.ID, storiface.FTKey, true, nil); err != nil { + if err := sb.Sectors.storage.Remove(ctx, sector.ID, storiface.FTKey, true, nil); err != nil { return err } diff --git a/tasks/sealsupra/task_supraseal.go b/tasks/sealsupra/task_supraseal.go index 157eff756..e37ac13e9 100644 --- a/tasks/sealsupra/task_supraseal.go +++ b/tasks/sealsupra/task_supraseal.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/filecoin-project/curio/lib/ffi" "os" "path/filepath" "time" @@ -50,6 +51,7 @@ type SupraSeal struct { api SupraSealNodeAPI storage *paths.Remote sindex paths.SectorIndex + sc *ffi.SealCalls pipelines int // 1 or 2 sectors int // sectors in a batch @@ -269,6 +271,11 @@ func (s *SupraSeal) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done outPathIDs := make([]storiface.SectorPaths, len(sectors)) alloc := storiface.FTSealed | storiface.FTCache + releaseStorage := func() {} + defer func() { + releaseStorage() + }() + for i, t := range sectors { sid := abi.SectorID{ Miner: abi.ActorID(t.SpID), @@ -310,10 +317,15 @@ func (s *SupraSeal) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done ctx := context.WithValue(ctx, paths.SpaceUseKey, paths.SpaceUseFunc(SupraSpaceUse)) - ps, pathIDs, err := s.storage.AcquireSector(ctx, sref, storiface.FTNone, alloc, storiface.PathSealing, storiface.AcquireMove) + ps, pathIDs, release, err := s.sc.Sectors.AcquireSector(ctx, &taskID, sref, storiface.FTNone, alloc, storiface.PathSealing) if err != nil { return false, xerrors.Errorf("acquiring sector storage: %w", err) } + prevReleaseStorage := releaseStorage + releaseStorage = func() { + release() + prevReleaseStorage() + } outPaths[i] = supraffi.Path{ Replica: ps.Sealed, @@ -498,13 +510,19 @@ var ssizeToName = map[abi.SectorSize]string{ } func (s *SupraSeal) TypeDetails() harmonytask.TaskTypeDetails { + ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size + if seal.IsDevnet { + ssize = abi.SectorSize(2 << 20) + } + return harmonytask.TaskTypeDetails{ Max: taskhelp.Max(s.pipelines), Name: fmt.Sprintf("Batch%d-%s", s.sectors, ssizeToName[must.One(s.spt.SectorSize())]), Cost: resources.Resources{ - Cpu: 1, - Gpu: 0, - Ram: 16 << 30, + Cpu: 1, + Gpu: 0, + Ram: 16 << 30, + Storage: s.sc.StorageMulti(s.taskToSectors, storiface.FTCache|storiface.FTSealed, storiface.FTNone, ssize, storiface.PathSealing, paths.MinFreeStoragePercentage), }, MaxFailures: 4, IAmBored: passcall.Every(30*time.Second, s.schedule), @@ -569,6 +587,17 @@ func (s *SupraSeal) schedule(taskFunc harmonytask.AddTaskFunc) error { return nil } +func (s *SupraSeal) taskToSectors(id harmonytask.TaskID) ([]ffi.SectorRef, error) { + var sectors []ffi.SectorRef + + err := s.db.Select(context.Background(), §ors, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_sdr = $1 AND task_id_tree_r = $1 AND task_id_tree_c = $1 AND task_id_tree_d = $1`, id) + if err != nil { + return nil, xerrors.Errorf("getting sector params: %w", err) + } + + return sectors, nil +} + var FSOverheadSupra = map[storiface.SectorFileType]int{ // 10x overheads storiface.FTUnsealed: storiface.FSOverheadDen, storiface.FTSealed: storiface.FSOverheadDen,