From a69f9cfc6ba6ff332d1e2d8303020d49443ca8cb Mon Sep 17 00:00:00 2001 From: Julio Capote Date: Sun, 1 Jan 2023 19:38:31 -0500 Subject: working dedup --- models/dedup_item.go | 14 -------------- models/model.go | 3 +++ models/outbox_item.go | 29 +++++++++++++++++++++++++++++ models/persister.go | 22 ++++++++++++++++++++-- 4 files changed, 52 insertions(+), 16 deletions(-) delete mode 100644 models/dedup_item.go (limited to 'models') diff --git a/models/dedup_item.go b/models/dedup_item.go deleted file mode 100644 index b079181..0000000 --- a/models/dedup_item.go +++ /dev/null @@ -1,14 +0,0 @@ -package models - -// func NewActivityStreamsObject(obj vocab.ActivityStreamsObject) *ActivityStreamsObject { -// aso := &ActivityStreamsObject{wrappedObject: obj} -// return aso -// } - -// func (a *ActivityStreamsObject) Id() []byte { - -// } - -// func (a *ActivityStreamsObject) Value() []byte { - -// } diff --git a/models/model.go b/models/model.go index 354b3bd..8832ce1 100644 --- a/models/model.go +++ b/models/model.go @@ -3,7 +3,10 @@ package models import "github.com/dgraph-io/badger/v3" type model interface { + Name() string Save(txn *badger.Txn) error + SaveDedup(txn *badger.Txn) error Keybase() string Key() string + DedupKey() string } diff --git a/models/outbox_item.go b/models/outbox_item.go index 9d9de97..34457d6 100644 --- a/models/outbox_item.go +++ b/models/outbox_item.go @@ -2,6 +2,7 @@ package models import ( "bytes" + "crypto/sha256" "encoding/gob" "fmt" "time" @@ -15,6 +16,7 @@ type OutboxItem struct { Handler config.Handler Content []byte Id []byte + Sha256 string CreatedAt time.Time } @@ -26,25 +28,48 @@ func NewOutboxItem(h config.Handler) *OutboxItem { func CreateOutboxItem(h config.Handler, content []byte) *OutboxItem { t := time.Now() + hash := sha256.New() + hash.Write(content) + binHash := hash.Sum(nil) k, _ := ksuid.NewRandomWithTime(t) aso := &OutboxItem{ Handler: h, CreatedAt: t, Content: content, + Sha256: fmt.Sprintf("%x", binHash), Id: []byte(k.String()), // NOTE: we want the bytes of the string representation of a hash, NOT a binary hash } return aso } +func (a *OutboxItem) Name() string { + return "OutboxItem" +} + func (a *OutboxItem) Key() string { return fmt.Sprintf("%s:%s", a.Keybase(), a.Id) } +func (a *OutboxItem) DedupKey() string { + return fmt.Sprintf("dd:%s:sha256:%s", a.Keybase(), a.Sha256) +} + func (a *OutboxItem) Keybase() string { keyBase := fmt.Sprintf("outboxes:%s", a.Handler.Name) return keyBase } +func (a *OutboxItem) SaveDedup(txn *badger.Txn) error { + if len(a.Content) == 0 { + return fmt.Errorf("content not set") + } + if len(a.Sha256) == 0 { + return fmt.Errorf("sha256 not set") + } + e := badger.NewEntry([]byte(a.DedupKey()), nil).WithTTL(a.Handler.DedupWindow) + return txn.SetEntry(e) +} + func (a *OutboxItem) Save(txn *badger.Txn) error { if len(a.Content) == 0 { return fmt.Errorf("content not set") @@ -52,6 +77,10 @@ func (a *OutboxItem) Save(txn *badger.Txn) error { if len(a.Id) == 0 { return fmt.Errorf("id not set") } + if len(a.Sha256) == 0 { + return fmt.Errorf("sha256 not set") + } + var network bytes.Buffer enc := gob.NewEncoder(&network) err := enc.Encode(a) diff --git a/models/persister.go b/models/persister.go index 69e79f4..60c25b5 100644 --- a/models/persister.go +++ b/models/persister.go @@ -16,12 +16,30 @@ type PersisterResult struct { } func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister { - aso := &Persister{log: log, db: db} + logger := log.With("type", "db") + aso := &Persister{log: logger, db: db} return aso } func (p *Persister) Store(model model) error { - err := p.db.Update(model.Save) + log := p.log.With("model", model.Name()) + log.Debug("store") + err := p.db.View(func(txn *badger.Txn) error { + var getErr error + var updateErr error + log.With("DedupKey", model.DedupKey()).Debug("checking") + _, getErr = txn.Get([]byte(model.DedupKey())) + if getErr == badger.ErrKeyNotFound { + log.With("DedupKey", model.DedupKey()).Debug("not found") + updateErr = p.db.Update(model.Save) // stores the outbox item + if updateErr != nil { + return updateErr + } + log.With("DedupKey", model.DedupKey()).Debug("saving") + updateErr = p.db.Update(model.SaveDedup) // stores the sha256 + } + return updateErr + }) return err } -- cgit v1.2.3