diff options
author | Julio Capote <jcapote@gmail.com> | 2023-01-02 00:38:31 +0000 |
---|---|---|
committer | Julio Capote <jcapote@gmail.com> | 2023-01-02 00:38:31 +0000 |
commit | a69f9cfc6ba6ff332d1e2d8303020d49443ca8cb (patch) | |
tree | a487561c677a1c54fecae70fa5f46294449bd1b4 | |
parent | bce250b0e75812c4f61b925d898c320d6ef11c5c (diff) | |
download | communique-a69f9cfc6ba6ff332d1e2d8303020d49443ca8cb.tar.gz |
working dedup
-rw-r--r-- | cgi/servers.go | 2 | ||||
-rw-r--r-- | config/config.go | 9 | ||||
-rw-r--r-- | main.go | 8 | ||||
-rw-r--r-- | models/dedup_item.go | 14 | ||||
-rw-r--r-- | models/model.go | 3 | ||||
-rw-r--r-- | models/outbox_item.go | 29 | ||||
-rw-r--r-- | models/persister.go | 22 | ||||
-rw-r--r-- | sample-config.toml | 3 |
8 files changed, 68 insertions, 22 deletions
diff --git a/cgi/servers.go b/cgi/servers.go index c751135..58d72c1 100644 --- a/cgi/servers.go +++ b/cgi/servers.go @@ -62,7 +62,7 @@ func startCGIServer(h config.Handler) { } func startTicker(h config.Handler, persister *models.Persister, log *zap.SugaredLogger) { - ticker := time.NewTicker(h.Interval) + ticker := time.NewTicker(h.Interval) // TODO add some random jitter here so handlers dont run at the same exact intervals done := make(chan bool) func() { for { diff --git a/config/config.go b/config/config.go index e8e5387..3c6cdde 100644 --- a/config/config.go +++ b/config/config.go @@ -9,8 +9,9 @@ type Config struct { } type Handler struct { - Name string - Exec string - Rpc string - Interval time.Duration + Name string + Exec string + Rpc string + DedupWindow time.Duration + Interval time.Duration } @@ -39,6 +39,14 @@ func main() { } defer db.Close() + log.With("type", "db").Debug("Running GC") + err = db.RunValueLogGC(0.5) + if err == badger.ErrNoRewrite { + log.With("type", "db").Debug("Nothing to GC") + } else { + log.Fatal(err) + } + // Persister persister := models.NewPersister(log, db) diff --git a/models/dedup_item.go b/models/dedup_item.go deleted file mode 100644 index b079181..0000000 --- a/models/dedup_item.go +++ /dev/null @@ -1,14 +0,0 @@ -package models - -// func NewActivityStreamsObject(obj vocab.ActivityStreamsObject) *ActivityStreamsObject { -// aso := &ActivityStreamsObject{wrappedObject: obj} -// return aso -// } - -// func (a *ActivityStreamsObject) Id() []byte { - -// } - -// func (a *ActivityStreamsObject) Value() []byte { - -// } diff --git a/models/model.go b/models/model.go index 354b3bd..8832ce1 100644 --- a/models/model.go +++ b/models/model.go @@ -3,7 +3,10 @@ package models import "github.com/dgraph-io/badger/v3" type model interface { + Name() string Save(txn *badger.Txn) error + SaveDedup(txn *badger.Txn) error Keybase() string Key() string + DedupKey() string } diff --git a/models/outbox_item.go b/models/outbox_item.go index 9d9de97..34457d6 100644 --- a/models/outbox_item.go +++ b/models/outbox_item.go @@ -2,6 +2,7 @@ package models import ( "bytes" + "crypto/sha256" "encoding/gob" "fmt" "time" @@ -15,6 +16,7 @@ type OutboxItem struct { Handler config.Handler Content []byte Id []byte + Sha256 string CreatedAt time.Time } @@ -26,25 +28,48 @@ func NewOutboxItem(h config.Handler) *OutboxItem { func CreateOutboxItem(h config.Handler, content []byte) *OutboxItem { t := time.Now() + hash := sha256.New() + hash.Write(content) + binHash := hash.Sum(nil) k, _ := ksuid.NewRandomWithTime(t) aso := &OutboxItem{ Handler: h, CreatedAt: t, Content: content, + Sha256: fmt.Sprintf("%x", binHash), Id: []byte(k.String()), // NOTE: we want the bytes of the string representation of a hash, NOT a binary hash } return aso } +func (a *OutboxItem) Name() string { + return "OutboxItem" +} + func (a *OutboxItem) Key() string { return fmt.Sprintf("%s:%s", a.Keybase(), a.Id) } +func (a *OutboxItem) DedupKey() string { + return fmt.Sprintf("dd:%s:sha256:%s", a.Keybase(), a.Sha256) +} + func (a *OutboxItem) Keybase() string { keyBase := fmt.Sprintf("outboxes:%s", a.Handler.Name) return keyBase } +func (a *OutboxItem) SaveDedup(txn *badger.Txn) error { + if len(a.Content) == 0 { + return fmt.Errorf("content not set") + } + if len(a.Sha256) == 0 { + return fmt.Errorf("sha256 not set") + } + e := badger.NewEntry([]byte(a.DedupKey()), nil).WithTTL(a.Handler.DedupWindow) + return txn.SetEntry(e) +} + func (a *OutboxItem) Save(txn *badger.Txn) error { if len(a.Content) == 0 { return fmt.Errorf("content not set") @@ -52,6 +77,10 @@ func (a *OutboxItem) Save(txn *badger.Txn) error { if len(a.Id) == 0 { return fmt.Errorf("id not set") } + if len(a.Sha256) == 0 { + return fmt.Errorf("sha256 not set") + } + var network bytes.Buffer enc := gob.NewEncoder(&network) err := enc.Encode(a) diff --git a/models/persister.go b/models/persister.go index 69e79f4..60c25b5 100644 --- a/models/persister.go +++ b/models/persister.go @@ -16,12 +16,30 @@ type PersisterResult struct { } func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister { - aso := &Persister{log: log, db: db} + logger := log.With("type", "db") + aso := &Persister{log: logger, db: db} return aso } func (p *Persister) Store(model model) error { - err := p.db.Update(model.Save) + log := p.log.With("model", model.Name()) + log.Debug("store") + err := p.db.View(func(txn *badger.Txn) error { + var getErr error + var updateErr error + log.With("DedupKey", model.DedupKey()).Debug("checking") + _, getErr = txn.Get([]byte(model.DedupKey())) + if getErr == badger.ErrKeyNotFound { + log.With("DedupKey", model.DedupKey()).Debug("not found") + updateErr = p.db.Update(model.Save) // stores the outbox item + if updateErr != nil { + return updateErr + } + log.With("DedupKey", model.DedupKey()).Debug("saving") + updateErr = p.db.Update(model.SaveDedup) // stores the sha256 + } + return updateErr + }) return err } diff --git a/sample-config.toml b/sample-config.toml index 218660a..a77678c 100644 --- a/sample-config.toml +++ b/sample-config.toml @@ -6,7 +6,8 @@ name = "sample" rpc = "cgi" # rename to protocol? # add response type? like Note exec = "sample-cgi-handler.sh" -interval = "55s" +interval = "5s" +dedupWindow = "1m" # [[handlers]] # name = "another" |