aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main.go2
-rw-r--r--models/activity_streams_object.go53
-rw-r--r--models/dedup_item.go14
-rw-r--r--models/model.go8
-rw-r--r--models/persister.go37
-rw-r--r--registry/registry.go25
-rw-r--r--views/outbox.go (renamed from resources/outbox.go)87
-rw-r--r--views/profile.go (renamed from resources/profile.go)2
-rw-r--r--views/webfinger.go (renamed from resources/webfinger.go)2
9 files changed, 165 insertions, 65 deletions
diff --git a/main.go b/main.go
index 064ba93..fa81264 100644
--- a/main.go
+++ b/main.go
@@ -43,7 +43,7 @@ func main() {
persister := models.NewPersister(log, db)
// Registry
- registry := registry.NewRegistry(cfg, db)
+ registry := registry.NewRegistry(cfg, persister)
// Servers
var mainWg sync.WaitGroup
diff --git a/models/activity_streams_object.go b/models/activity_streams_object.go
new file mode 100644
index 0000000..85a7d09
--- /dev/null
+++ b/models/activity_streams_object.go
@@ -0,0 +1,53 @@
+package models
+
+import (
+ "fmt"
+ "time"
+
+ "git.capotej.com/capotej/communique/config"
+ "github.com/dgraph-io/badger/v3"
+ "github.com/go-fed/activity/streams/vocab"
+ "github.com/segmentio/ksuid"
+)
+
+// ActivityStreamsObject is our internal model for storing activity streams objects
+// For every object we receive, we create 2+N copies of it in Badger:
+// - the original object
+// - a copy for dedupe with an optional TTL (specified by handler response) TODO
+// - a temporary copy for every subscriber until it is delivered TODO
+type ActivityStreamsObject struct {
+ wrappedObject vocab.ActivityStreamsNote // OPTIONAL, not always wrapped (think .count)
+ handler config.Handler
+}
+
+func NewActivityStreamsObject(obj vocab.ActivityStreamsNote, h config.Handler) *ActivityStreamsObject {
+ aso := &ActivityStreamsObject{wrappedObject: obj, handler: h}
+ return aso
+}
+
+func (a *ActivityStreamsObject) keyName() []byte {
+ k, _ := ksuid.NewRandomWithTime(time.Now())
+ key := fmt.Sprintf("%s:%s", a.Keybase(), k.String())
+ return []byte(key)
+}
+
+func (a *ActivityStreamsObject) Keybase() string {
+ keyBase := fmt.Sprintf("outboxes:%s", a.handler.Name)
+ return keyBase
+}
+
+func (a *ActivityStreamsObject) content() []byte {
+ var contentVal string
+ if a.wrappedObject.GetActivityStreamsContent() != nil {
+ content := a.wrappedObject.GetActivityStreamsContent()
+ for contentIter := content.Begin(); contentIter != content.End(); contentIter = contentIter.Next() {
+ contentVal = contentIter.GetXMLSchemaString()
+ }
+ }
+ return []byte(contentVal)
+}
+
+func (a *ActivityStreamsObject) Save(txn *badger.Txn) error {
+ e := badger.NewEntry(a.keyName(), a.content())
+ return txn.SetEntry(e)
+}
diff --git a/models/dedup_item.go b/models/dedup_item.go
new file mode 100644
index 0000000..b079181
--- /dev/null
+++ b/models/dedup_item.go
@@ -0,0 +1,14 @@
+package models
+
+// func NewActivityStreamsObject(obj vocab.ActivityStreamsObject) *ActivityStreamsObject {
+// aso := &ActivityStreamsObject{wrappedObject: obj}
+// return aso
+// }
+
+// func (a *ActivityStreamsObject) Id() []byte {
+
+// }
+
+// func (a *ActivityStreamsObject) Value() []byte {
+
+// }
diff --git a/models/model.go b/models/model.go
new file mode 100644
index 0000000..56f1680
--- /dev/null
+++ b/models/model.go
@@ -0,0 +1,8 @@
+package models
+
+import "github.com/dgraph-io/badger/v3"
+
+type model interface {
+ Save(txn *badger.Txn) error
+ Keybase() string
+}
diff --git a/models/persister.go b/models/persister.go
new file mode 100644
index 0000000..71cddc8
--- /dev/null
+++ b/models/persister.go
@@ -0,0 +1,37 @@
+package models
+
+import (
+ "github.com/dgraph-io/badger/v3"
+ "go.uber.org/zap"
+)
+
+type Persister struct {
+ log *zap.SugaredLogger
+ db *badger.DB
+}
+
+func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister {
+ aso := &Persister{log: log, db: db}
+ return aso
+}
+
+func (p *Persister) Store(model model) error {
+ err := p.db.Update(model.Save)
+ return err
+}
+
+func (p *Persister) Count(model model) (int, error) {
+ opts := badger.DefaultIteratorOptions
+ opts.PrefetchValues = false
+ var count int
+ err := p.db.View(func(txn *badger.Txn) error {
+ it := txn.NewIterator(opts)
+ defer it.Close()
+ prefix := []byte(model.Keybase())
+ for it.Seek(prefix); it.ValidForPrefix([]byte(prefix)); it.Next() {
+ count++
+ }
+ return nil
+ })
+ return count, err
+}
diff --git a/registry/registry.go b/registry/registry.go
index 4dbacc7..f57aede 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -4,8 +4,8 @@ import (
"fmt"
"git.capotej.com/capotej/communique/config"
- "git.capotej.com/capotej/communique/resources"
- "github.com/dgraph-io/badger/v3"
+ "git.capotej.com/capotej/communique/models"
+ "git.capotej.com/capotej/communique/views"
)
type Handler struct {
@@ -14,12 +14,12 @@ type Handler struct {
type Registry struct {
cfg config.Config
- db *badger.DB
+ persister *models.Persister
handlerMap map[string]Handler
}
-func NewRegistry(cfg config.Config, db *badger.DB) *Registry {
- reg := Registry{cfg: cfg, db: db}
+func NewRegistry(cfg config.Config, persister *models.Persister) *Registry {
+ reg := Registry{cfg: cfg, persister: persister}
reg.handlerMap = make(map[string]Handler)
for _, v := range cfg.Handlers {
reg.handlerMap[fqn(v.Name, cfg.Domain)] = Handler{handlerCfg: v}
@@ -32,7 +32,7 @@ func (r *Registry) Profile(name string) (map[string]interface{}, error) {
if handler == nil {
return nil, nil
}
- return resources.RenderProfile(handler.handlerCfg.Name, r.cfg.Domain)
+ return views.RenderProfile(handler.handlerCfg.Name, r.cfg.Domain)
}
func (r *Registry) Outbox(name string) (map[string]interface{}, error) {
@@ -40,7 +40,12 @@ func (r *Registry) Outbox(name string) (map[string]interface{}, error) {
if handler == nil {
return nil, nil
}
- return resources.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain, r.db)
+ aso := models.NewActivityStreamsObject(nil, handler.handlerCfg)
+ totalItems, err := r.persister.Count(aso)
+ if err != nil {
+ return nil, err
+ }
+ return views.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain, totalItems)
}
func (r *Registry) OutboxPage(name string) (map[string]interface{}, error) {
@@ -48,10 +53,10 @@ func (r *Registry) OutboxPage(name string) (map[string]interface{}, error) {
if handler == nil {
return nil, nil
}
- return resources.RenderOutboxPage(handler.handlerCfg.Name, r.cfg.Domain, r.db)
+ return views.RenderOutboxPage(handler.handlerCfg.Name, r.cfg.Domain)
}
-func (r *Registry) WebfingerResource(fqn string) (*resources.WebfingerResource, error) {
+func (r *Registry) WebfingerResource(fqn string) (*views.WebfingerResource, error) {
handler := r.findByFQN(fqn)
if handler == nil {
handler = r.findByFQN("acct:" + fqn)
@@ -59,7 +64,7 @@ func (r *Registry) WebfingerResource(fqn string) (*resources.WebfingerResource,
if handler == nil {
return nil, nil
}
- return resources.RenderWebfingerResource(handler.handlerCfg.Name, r.cfg.Domain)
+ return views.RenderWebfingerResource(handler.handlerCfg.Name, r.cfg.Domain)
}
func fqn(name, domain string) string {
diff --git a/resources/outbox.go b/views/outbox.go
index e7004c4..fe87e24 100644
--- a/resources/outbox.go
+++ b/views/outbox.go
@@ -1,12 +1,11 @@
-package resources
+package views
import (
"git.capotej.com/capotej/communique/urls"
- "github.com/dgraph-io/badger/v3"
"github.com/go-fed/activity/streams"
)
-func RenderOutboxPage(name, domain string, db *badger.DB) (map[string]interface{}, error) {
+func RenderOutboxPage(name, domain string) (map[string]interface{}, error) {
id, err := urls.UrlOutboxPage(name, domain)
if err != nil {
return nil, err
@@ -29,43 +28,43 @@ func RenderOutboxPage(name, domain string, db *badger.DB) (map[string]interface{
itemsProp := streams.NewActivityStreamsOrderedItemsProperty()
- err = db.View(func(txn *badger.Txn) error {
- opts := badger.DefaultIteratorOptions
- opts.PrefetchValues = false
- it := txn.NewIterator(opts)
- defer it.Close()
- prefix := []byte("outbox:sample") // TODO
- for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
- item := it.Item()
- err := item.Value(func(v []byte) error {
- crea := streams.NewActivityStreamsCreate()
- obj := streams.NewActivityStreamsObjectProperty()
- crea.SetActivityStreamsObject(obj)
-
- note := streams.NewActivityStreamsNote()
- contentProp := streams.NewActivityStreamsContentProperty()
- contentProp.AppendXMLSchemaString(string(v))
- note.SetActivityStreamsContent(contentProp)
- obj.AppendActivityStreamsNote(note)
-
- itemsProp.AppendActivityStreamsCreate(crea)
- return nil
- })
- if err != nil {
- return err
- }
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
+ // err = db.View(func(txn *badger.Txn) error {
+ // opts := badger.DefaultIteratorOptions
+ // opts.PrefetchValues = false
+ // it := txn.NewIterator(opts)
+ // defer it.Close()
+ // prefix := []byte("outbox:sample") // TODO
+ // for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
+ // item := it.Item()
+ // err := item.Value(func(v []byte) error {
+ // crea := streams.NewActivityStreamsCreate()
+ // obj := streams.NewActivityStreamsObjectProperty()
+ // crea.SetActivityStreamsObject(obj)
+
+ // note := streams.NewActivityStreamsNote()
+ // contentProp := streams.NewActivityStreamsContentProperty()
+ // contentProp.AppendXMLSchemaString(string(v))
+ // note.SetActivityStreamsContent(contentProp)
+ // obj.AppendActivityStreamsNote(note)
+
+ // itemsProp.AppendActivityStreamsCreate(crea)
+ // return nil
+ // })
+ // if err != nil {
+ // return err
+ // }
+ // }
+ // return nil
+ // })
+ // if err != nil {
+ // return nil, err
+ // }
oc.SetActivityStreamsOrderedItems(itemsProp)
return streams.Serialize(oc)
}
-func RenderOutbox(name, domain string, db *badger.DB) (map[string]interface{}, error) {
+func RenderOutbox(name, domain string, totalItems int) (map[string]interface{}, error) {
id, err := urls.UrlOutbox(name, domain)
if err != nil {
@@ -83,24 +82,8 @@ func RenderOutbox(name, domain string, db *badger.DB) (map[string]interface{}, e
idProp.Set(id)
oc.SetJSONLDId(idProp)
- var i int
- // err = db.View(func(txn *badger.Txn) error {
- // opts := badger.DefaultIteratorOptions
- // opts.PrefetchValues = false
- // it := txn.NewIterator(opts)
- // defer it.Close()
- // prefix := []byte("outbox:sample") // TODO
- // for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
- // i++
- // }
- // return nil
- // })
- // if err != nil {
- // return nil, err
- // }
-
itemsProp := streams.NewActivityStreamsTotalItemsProperty()
- itemsProp.Set(i)
+ itemsProp.Set(totalItems)
oc.SetActivityStreamsTotalItems(itemsProp)
ocProp := streams.NewActivityStreamsFirstProperty()
diff --git a/resources/profile.go b/views/profile.go
index 971b11c..12aae54 100644
--- a/resources/profile.go
+++ b/views/profile.go
@@ -1,4 +1,4 @@
-package resources
+package views
import (
"git.capotej.com/capotej/communique/urls"
diff --git a/resources/webfinger.go b/views/webfinger.go
index e7c47ea..4252bae 100644
--- a/resources/webfinger.go
+++ b/views/webfinger.go
@@ -1,4 +1,4 @@
-package resources
+package views
import (
"fmt"