aboutsummaryrefslogtreecommitdiff
path: root/models
diff options
context:
space:
mode:
authorJulio Capote <jcapote@gmail.com>2022-12-31 02:20:02 +0000
committerJulio Capote <jcapote@gmail.com>2022-12-31 02:20:02 +0000
commitc500a2be38afcbb5688537d97c7c3ee30a57dba4 (patch)
tree79cf1884ca7529299646b567b7705378bdf08fd3 /models
parent74ffcfe6b2c80b7cf459798dc42bd278075ccb50 (diff)
downloadcommunique-c500a2be38afcbb5688537d97c7c3ee30a57dba4.tar.gz
parse and persist feeds from handlers
Diffstat (limited to 'models')
-rw-r--r--models/activity_streams_object.go54
-rw-r--r--models/outbox_item.go56
-rw-r--r--models/persister.go12
3 files changed, 65 insertions, 57 deletions
diff --git a/models/activity_streams_object.go b/models/activity_streams_object.go
deleted file mode 100644
index 98c69bb..0000000
--- a/models/activity_streams_object.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package models
-
-import (
- "bytes"
- "encoding/json"
- "fmt"
- "time"
-
- "git.capotej.com/capotej/communique/config"
- "github.com/dgraph-io/badger/v3"
- "github.com/go-fed/activity/streams/vocab"
- "github.com/segmentio/ksuid"
-)
-
-// ActivityStreamsObject is our internal model for storing activity streams objects
-// For every object we receive, we create 2+N copies of it in Badger:
-// - the original object
-// - a copy for dedupe with an optional TTL (specified by handler response) TODO
-// - a temporary copy for every subscriber until it is delivered TODO
-type ActivityStreamsObject struct {
- wrappedObject vocab.ActivityStreamsNote // OPTIONAL, not always wrapped (think .count)
- handler config.Handler
-}
-
-func NewActivityStreamsObject(obj vocab.ActivityStreamsNote, h config.Handler) *ActivityStreamsObject {
- aso := &ActivityStreamsObject{wrappedObject: obj, handler: h}
- return aso
-}
-
-func (a *ActivityStreamsObject) keyName() []byte {
- k, _ := ksuid.NewRandomWithTime(time.Now())
- key := fmt.Sprintf("%s:%s", a.Keybase(), k.String())
- return []byte(key)
-}
-
-func (a *ActivityStreamsObject) Keybase() string {
- keyBase := fmt.Sprintf("outboxes:%s", a.handler.Name)
- return keyBase
-}
-
-func (a *ActivityStreamsObject) content() []byte {
- objMap, _ := a.wrappedObject.Serialize()
- buffer := &bytes.Buffer{}
- encoder := json.NewEncoder(buffer)
- encoder.SetEscapeHTML(false)
- encoder.Encode(objMap)
- return bytes.TrimRight(buffer.Bytes(), "\n")
-}
-
-func (a *ActivityStreamsObject) Save(txn *badger.Txn) error {
- content := a.content()
- e := badger.NewEntry(a.keyName(), content)
- return txn.SetEntry(e)
-}
diff --git a/models/outbox_item.go b/models/outbox_item.go
new file mode 100644
index 0000000..ed74130
--- /dev/null
+++ b/models/outbox_item.go
@@ -0,0 +1,56 @@
+package models
+
+import (
+ "fmt"
+ "time"
+
+ "git.capotej.com/capotej/communique/config"
+ "github.com/dgraph-io/badger/v3"
+ "github.com/segmentio/ksuid"
+)
+
+type OutboxItem struct {
+ handler config.Handler
+ content []byte
+ id []byte
+ createdAt time.Time
+}
+
+// used for lookup purposes (count, collect)
+func NewOutboxItem(h config.Handler) *OutboxItem {
+ aso := &OutboxItem{handler: h}
+ return aso
+}
+
+func CreateOutboxItem(h config.Handler, content []byte) *OutboxItem {
+ t := time.Now()
+ k, _ := ksuid.NewRandomWithTime(t)
+ aso := &OutboxItem{
+ handler: h,
+ createdAt: t,
+ content: content,
+ id: k.Bytes(),
+ }
+ return aso
+}
+
+func (a *OutboxItem) keyName() []byte {
+ key := fmt.Sprintf("%s:%s", a.Keybase(), a.id)
+ return []byte(key)
+}
+
+func (a *OutboxItem) Keybase() string {
+ keyBase := fmt.Sprintf("outboxes:%s", a.handler.Name)
+ return keyBase
+}
+
+func (a *OutboxItem) Save(txn *badger.Txn) error {
+ if len(a.content) == 0 {
+ return fmt.Errorf("content not set")
+ }
+ if len(a.id) == 0 {
+ return fmt.Errorf("id not set")
+ }
+ e := badger.NewEntry(a.keyName(), a.content)
+ return txn.SetEntry(e)
+}
diff --git a/models/persister.go b/models/persister.go
index 3729c81..a639fc7 100644
--- a/models/persister.go
+++ b/models/persister.go
@@ -10,6 +10,11 @@ type Persister struct {
db *badger.DB
}
+type PersisterResult struct {
+ Value []byte
+ Key []byte
+}
+
func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister {
aso := &Persister{log: log, db: db}
return aso
@@ -36,8 +41,8 @@ func (p *Persister) Count(model model) (int, error) {
return count, err
}
-func (p *Persister) Collect(model model) ([]string, error) {
- var result []string
+func (p *Persister) Collect(model model) ([]PersisterResult, error) {
+ var result []PersisterResult
err := p.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false // TODO Maybe we want true here
@@ -47,7 +52,8 @@ func (p *Persister) Collect(model model) ([]string, error) {
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
item.Value(func(v []byte) error {
- result = append(result, string(v))
+ pr := PersisterResult{Key: it.Item().Key(), Value: v}
+ result = append(result, pr)
return nil
})
}