aboutsummaryrefslogtreecommitdiff
path: root/models
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--models/activity_streams_object.go53
-rw-r--r--models/dedup_item.go14
-rw-r--r--models/model.go8
-rw-r--r--models/persister.go37
4 files changed, 112 insertions, 0 deletions
diff --git a/models/activity_streams_object.go b/models/activity_streams_object.go
new file mode 100644
index 0000000..85a7d09
--- /dev/null
+++ b/models/activity_streams_object.go
@@ -0,0 +1,53 @@
+package models
+
+import (
+ "fmt"
+ "time"
+
+ "git.capotej.com/capotej/communique/config"
+ "github.com/dgraph-io/badger/v3"
+ "github.com/go-fed/activity/streams/vocab"
+ "github.com/segmentio/ksuid"
+)
+
+// ActivityStreamsObject is our internal model for storing activity streams objects
+// For every object we receive, we create 2+N copies of it in Badger:
+// - the original object
+// - a copy for dedupe with an optional TTL (specified by handler response) TODO
+// - a temporary copy for every subscriber until it is delivered TODO
+type ActivityStreamsObject struct {
+ wrappedObject vocab.ActivityStreamsNote // OPTIONAL, not always wrapped (think .count)
+ handler config.Handler
+}
+
+func NewActivityStreamsObject(obj vocab.ActivityStreamsNote, h config.Handler) *ActivityStreamsObject {
+ aso := &ActivityStreamsObject{wrappedObject: obj, handler: h}
+ return aso
+}
+
+func (a *ActivityStreamsObject) keyName() []byte {
+ k, _ := ksuid.NewRandomWithTime(time.Now())
+ key := fmt.Sprintf("%s:%s", a.Keybase(), k.String())
+ return []byte(key)
+}
+
+func (a *ActivityStreamsObject) Keybase() string {
+ keyBase := fmt.Sprintf("outboxes:%s", a.handler.Name)
+ return keyBase
+}
+
+func (a *ActivityStreamsObject) content() []byte {
+ var contentVal string
+ if a.wrappedObject.GetActivityStreamsContent() != nil {
+ content := a.wrappedObject.GetActivityStreamsContent()
+ for contentIter := content.Begin(); contentIter != content.End(); contentIter = contentIter.Next() {
+ contentVal = contentIter.GetXMLSchemaString()
+ }
+ }
+ return []byte(contentVal)
+}
+
+func (a *ActivityStreamsObject) Save(txn *badger.Txn) error {
+ e := badger.NewEntry(a.keyName(), a.content())
+ return txn.SetEntry(e)
+}
diff --git a/models/dedup_item.go b/models/dedup_item.go
new file mode 100644
index 0000000..b079181
--- /dev/null
+++ b/models/dedup_item.go
@@ -0,0 +1,14 @@
+package models
+
+// func NewActivityStreamsObject(obj vocab.ActivityStreamsObject) *ActivityStreamsObject {
+// aso := &ActivityStreamsObject{wrappedObject: obj}
+// return aso
+// }
+
+// func (a *ActivityStreamsObject) Id() []byte {
+
+// }
+
+// func (a *ActivityStreamsObject) Value() []byte {
+
+// }
diff --git a/models/model.go b/models/model.go
new file mode 100644
index 0000000..56f1680
--- /dev/null
+++ b/models/model.go
@@ -0,0 +1,8 @@
+package models
+
+import "github.com/dgraph-io/badger/v3"
+
+type model interface {
+ Save(txn *badger.Txn) error
+ Keybase() string
+}
diff --git a/models/persister.go b/models/persister.go
new file mode 100644
index 0000000..71cddc8
--- /dev/null
+++ b/models/persister.go
@@ -0,0 +1,37 @@
+package models
+
+import (
+ "github.com/dgraph-io/badger/v3"
+ "go.uber.org/zap"
+)
+
+type Persister struct {
+ log *zap.SugaredLogger
+ db *badger.DB
+}
+
+func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister {
+ aso := &Persister{log: log, db: db}
+ return aso
+}
+
+func (p *Persister) Store(model model) error {
+ err := p.db.Update(model.Save)
+ return err
+}
+
+func (p *Persister) Count(model model) (int, error) {
+ opts := badger.DefaultIteratorOptions
+ opts.PrefetchValues = false
+ var count int
+ err := p.db.View(func(txn *badger.Txn) error {
+ it := txn.NewIterator(opts)
+ defer it.Close()
+ prefix := []byte(model.Keybase())
+ for it.Seek(prefix); it.ValidForPrefix([]byte(prefix)); it.Next() {
+ count++
+ }
+ return nil
+ })
+ return count, err
+}