diff options
author | Julio Capote <jcapote@gmail.com> | 2022-12-31 02:20:02 +0000 |
---|---|---|
committer | Julio Capote <jcapote@gmail.com> | 2022-12-31 02:20:02 +0000 |
commit | c500a2be38afcbb5688537d97c7c3ee30a57dba4 (patch) | |
tree | 79cf1884ca7529299646b567b7705378bdf08fd3 /models | |
parent | 74ffcfe6b2c80b7cf459798dc42bd278075ccb50 (diff) | |
download | communique-c500a2be38afcbb5688537d97c7c3ee30a57dba4.tar.gz |
parse and persist feeds from handlers
Diffstat (limited to '')
-rw-r--r-- | models/activity_streams_object.go | 54 | ||||
-rw-r--r-- | models/outbox_item.go | 56 | ||||
-rw-r--r-- | models/persister.go | 12 |
3 files changed, 65 insertions, 57 deletions
diff --git a/models/activity_streams_object.go b/models/activity_streams_object.go deleted file mode 100644 index 98c69bb..0000000 --- a/models/activity_streams_object.go +++ /dev/null @@ -1,54 +0,0 @@ -package models - -import ( - "bytes" - "encoding/json" - "fmt" - "time" - - "git.capotej.com/capotej/communique/config" - "github.com/dgraph-io/badger/v3" - "github.com/go-fed/activity/streams/vocab" - "github.com/segmentio/ksuid" -) - -// ActivityStreamsObject is our internal model for storing activity streams objects -// For every object we receive, we create 2+N copies of it in Badger: -// - the original object -// - a copy for dedupe with an optional TTL (specified by handler response) TODO -// - a temporary copy for every subscriber until it is delivered TODO -type ActivityStreamsObject struct { - wrappedObject vocab.ActivityStreamsNote // OPTIONAL, not always wrapped (think .count) - handler config.Handler -} - -func NewActivityStreamsObject(obj vocab.ActivityStreamsNote, h config.Handler) *ActivityStreamsObject { - aso := &ActivityStreamsObject{wrappedObject: obj, handler: h} - return aso -} - -func (a *ActivityStreamsObject) keyName() []byte { - k, _ := ksuid.NewRandomWithTime(time.Now()) - key := fmt.Sprintf("%s:%s", a.Keybase(), k.String()) - return []byte(key) -} - -func (a *ActivityStreamsObject) Keybase() string { - keyBase := fmt.Sprintf("outboxes:%s", a.handler.Name) - return keyBase -} - -func (a *ActivityStreamsObject) content() []byte { - objMap, _ := a.wrappedObject.Serialize() - buffer := &bytes.Buffer{} - encoder := json.NewEncoder(buffer) - encoder.SetEscapeHTML(false) - encoder.Encode(objMap) - return bytes.TrimRight(buffer.Bytes(), "\n") -} - -func (a *ActivityStreamsObject) Save(txn *badger.Txn) error { - content := a.content() - e := badger.NewEntry(a.keyName(), content) - return txn.SetEntry(e) -} diff --git a/models/outbox_item.go b/models/outbox_item.go new file mode 100644 index 0000000..ed74130 --- /dev/null +++ b/models/outbox_item.go @@ -0,0 +1,56 @@ +package models + +import ( + "fmt" + "time" + + "git.capotej.com/capotej/communique/config" + "github.com/dgraph-io/badger/v3" + "github.com/segmentio/ksuid" +) + +type OutboxItem struct { + handler config.Handler + content []byte + id []byte + createdAt time.Time +} + +// used for lookup purposes (count, collect) +func NewOutboxItem(h config.Handler) *OutboxItem { + aso := &OutboxItem{handler: h} + return aso +} + +func CreateOutboxItem(h config.Handler, content []byte) *OutboxItem { + t := time.Now() + k, _ := ksuid.NewRandomWithTime(t) + aso := &OutboxItem{ + handler: h, + createdAt: t, + content: content, + id: k.Bytes(), + } + return aso +} + +func (a *OutboxItem) keyName() []byte { + key := fmt.Sprintf("%s:%s", a.Keybase(), a.id) + return []byte(key) +} + +func (a *OutboxItem) Keybase() string { + keyBase := fmt.Sprintf("outboxes:%s", a.handler.Name) + return keyBase +} + +func (a *OutboxItem) Save(txn *badger.Txn) error { + if len(a.content) == 0 { + return fmt.Errorf("content not set") + } + if len(a.id) == 0 { + return fmt.Errorf("id not set") + } + e := badger.NewEntry(a.keyName(), a.content) + return txn.SetEntry(e) +} diff --git a/models/persister.go b/models/persister.go index 3729c81..a639fc7 100644 --- a/models/persister.go +++ b/models/persister.go @@ -10,6 +10,11 @@ type Persister struct { db *badger.DB } +type PersisterResult struct { + Value []byte + Key []byte +} + func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister { aso := &Persister{log: log, db: db} return aso @@ -36,8 +41,8 @@ func (p *Persister) Count(model model) (int, error) { return count, err } -func (p *Persister) Collect(model model) ([]string, error) { - var result []string +func (p *Persister) Collect(model model) ([]PersisterResult, error) { + var result []PersisterResult err := p.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false // TODO Maybe we want true here @@ -47,7 +52,8 @@ func (p *Persister) Collect(model model) ([]string, error) { for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { item := it.Item() item.Value(func(v []byte) error { - result = append(result, string(v)) + pr := PersisterResult{Key: it.Item().Key(), Value: v} + result = append(result, pr) return nil }) } |