diff options
-rw-r--r-- | main.go | 2 | ||||
-rw-r--r-- | models/activity_streams_object.go | 53 | ||||
-rw-r--r-- | models/dedup_item.go | 14 | ||||
-rw-r--r-- | models/model.go | 8 | ||||
-rw-r--r-- | models/persister.go | 37 | ||||
-rw-r--r-- | registry/registry.go | 25 | ||||
-rw-r--r-- | views/outbox.go (renamed from resources/outbox.go) | 87 | ||||
-rw-r--r-- | views/profile.go (renamed from resources/profile.go) | 2 | ||||
-rw-r--r-- | views/webfinger.go (renamed from resources/webfinger.go) | 2 |
9 files changed, 165 insertions, 65 deletions
@@ -43,7 +43,7 @@ func main() { persister := models.NewPersister(log, db) // Registry - registry := registry.NewRegistry(cfg, db) + registry := registry.NewRegistry(cfg, persister) // Servers var mainWg sync.WaitGroup diff --git a/models/activity_streams_object.go b/models/activity_streams_object.go new file mode 100644 index 0000000..85a7d09 --- /dev/null +++ b/models/activity_streams_object.go @@ -0,0 +1,53 @@ +package models + +import ( + "fmt" + "time" + + "git.capotej.com/capotej/communique/config" + "github.com/dgraph-io/badger/v3" + "github.com/go-fed/activity/streams/vocab" + "github.com/segmentio/ksuid" +) + +// ActivityStreamsObject is our internal model for storing activity streams objects +// For every object we receive, we create 2+N copies of it in Badger: +// - the original object +// - a copy for dedupe with an optional TTL (specified by handler response) TODO +// - a temporary copy for every subscriber until it is delivered TODO +type ActivityStreamsObject struct { + wrappedObject vocab.ActivityStreamsNote // OPTIONAL, not always wrapped (think .count) + handler config.Handler +} + +func NewActivityStreamsObject(obj vocab.ActivityStreamsNote, h config.Handler) *ActivityStreamsObject { + aso := &ActivityStreamsObject{wrappedObject: obj, handler: h} + return aso +} + +func (a *ActivityStreamsObject) keyName() []byte { + k, _ := ksuid.NewRandomWithTime(time.Now()) + key := fmt.Sprintf("%s:%s", a.Keybase(), k.String()) + return []byte(key) +} + +func (a *ActivityStreamsObject) Keybase() string { + keyBase := fmt.Sprintf("outboxes:%s", a.handler.Name) + return keyBase +} + +func (a *ActivityStreamsObject) content() []byte { + var contentVal string + if a.wrappedObject.GetActivityStreamsContent() != nil { + content := a.wrappedObject.GetActivityStreamsContent() + for contentIter := content.Begin(); contentIter != content.End(); contentIter = contentIter.Next() { + contentVal = contentIter.GetXMLSchemaString() + } + } + return []byte(contentVal) +} + +func (a *ActivityStreamsObject) Save(txn *badger.Txn) error { + e := badger.NewEntry(a.keyName(), a.content()) + return txn.SetEntry(e) +} diff --git a/models/dedup_item.go b/models/dedup_item.go new file mode 100644 index 0000000..b079181 --- /dev/null +++ b/models/dedup_item.go @@ -0,0 +1,14 @@ +package models + +// func NewActivityStreamsObject(obj vocab.ActivityStreamsObject) *ActivityStreamsObject { +// aso := &ActivityStreamsObject{wrappedObject: obj} +// return aso +// } + +// func (a *ActivityStreamsObject) Id() []byte { + +// } + +// func (a *ActivityStreamsObject) Value() []byte { + +// } diff --git a/models/model.go b/models/model.go new file mode 100644 index 0000000..56f1680 --- /dev/null +++ b/models/model.go @@ -0,0 +1,8 @@ +package models + +import "github.com/dgraph-io/badger/v3" + +type model interface { + Save(txn *badger.Txn) error + Keybase() string +} diff --git a/models/persister.go b/models/persister.go new file mode 100644 index 0000000..71cddc8 --- /dev/null +++ b/models/persister.go @@ -0,0 +1,37 @@ +package models + +import ( + "github.com/dgraph-io/badger/v3" + "go.uber.org/zap" +) + +type Persister struct { + log *zap.SugaredLogger + db *badger.DB +} + +func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister { + aso := &Persister{log: log, db: db} + return aso +} + +func (p *Persister) Store(model model) error { + err := p.db.Update(model.Save) + return err +} + +func (p *Persister) Count(model model) (int, error) { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + var count int + err := p.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opts) + defer it.Close() + prefix := []byte(model.Keybase()) + for it.Seek(prefix); it.ValidForPrefix([]byte(prefix)); it.Next() { + count++ + } + return nil + }) + return count, err +} diff --git a/registry/registry.go b/registry/registry.go index 4dbacc7..f57aede 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -4,8 +4,8 @@ import ( "fmt" "git.capotej.com/capotej/communique/config" - "git.capotej.com/capotej/communique/resources" - "github.com/dgraph-io/badger/v3" + "git.capotej.com/capotej/communique/models" + "git.capotej.com/capotej/communique/views" ) type Handler struct { @@ -14,12 +14,12 @@ type Handler struct { type Registry struct { cfg config.Config - db *badger.DB + persister *models.Persister handlerMap map[string]Handler } -func NewRegistry(cfg config.Config, db *badger.DB) *Registry { - reg := Registry{cfg: cfg, db: db} +func NewRegistry(cfg config.Config, persister *models.Persister) *Registry { + reg := Registry{cfg: cfg, persister: persister} reg.handlerMap = make(map[string]Handler) for _, v := range cfg.Handlers { reg.handlerMap[fqn(v.Name, cfg.Domain)] = Handler{handlerCfg: v} @@ -32,7 +32,7 @@ func (r *Registry) Profile(name string) (map[string]interface{}, error) { if handler == nil { return nil, nil } - return resources.RenderProfile(handler.handlerCfg.Name, r.cfg.Domain) + return views.RenderProfile(handler.handlerCfg.Name, r.cfg.Domain) } func (r *Registry) Outbox(name string) (map[string]interface{}, error) { @@ -40,7 +40,12 @@ func (r *Registry) Outbox(name string) (map[string]interface{}, error) { if handler == nil { return nil, nil } - return resources.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain, r.db) + aso := models.NewActivityStreamsObject(nil, handler.handlerCfg) + totalItems, err := r.persister.Count(aso) + if err != nil { + return nil, err + } + return views.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain, totalItems) } func (r *Registry) OutboxPage(name string) (map[string]interface{}, error) { @@ -48,10 +53,10 @@ func (r *Registry) OutboxPage(name string) (map[string]interface{}, error) { if handler == nil { return nil, nil } - return resources.RenderOutboxPage(handler.handlerCfg.Name, r.cfg.Domain, r.db) + return views.RenderOutboxPage(handler.handlerCfg.Name, r.cfg.Domain) } -func (r *Registry) WebfingerResource(fqn string) (*resources.WebfingerResource, error) { +func (r *Registry) WebfingerResource(fqn string) (*views.WebfingerResource, error) { handler := r.findByFQN(fqn) if handler == nil { handler = r.findByFQN("acct:" + fqn) @@ -59,7 +64,7 @@ func (r *Registry) WebfingerResource(fqn string) (*resources.WebfingerResource, if handler == nil { return nil, nil } - return resources.RenderWebfingerResource(handler.handlerCfg.Name, r.cfg.Domain) + return views.RenderWebfingerResource(handler.handlerCfg.Name, r.cfg.Domain) } func fqn(name, domain string) string { diff --git a/resources/outbox.go b/views/outbox.go index e7004c4..fe87e24 100644 --- a/resources/outbox.go +++ b/views/outbox.go @@ -1,12 +1,11 @@ -package resources +package views import ( "git.capotej.com/capotej/communique/urls" - "github.com/dgraph-io/badger/v3" "github.com/go-fed/activity/streams" ) -func RenderOutboxPage(name, domain string, db *badger.DB) (map[string]interface{}, error) { +func RenderOutboxPage(name, domain string) (map[string]interface{}, error) { id, err := urls.UrlOutboxPage(name, domain) if err != nil { return nil, err @@ -29,43 +28,43 @@ func RenderOutboxPage(name, domain string, db *badger.DB) (map[string]interface{ itemsProp := streams.NewActivityStreamsOrderedItemsProperty() - err = db.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - it := txn.NewIterator(opts) - defer it.Close() - prefix := []byte("outbox:sample") // TODO - for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { - item := it.Item() - err := item.Value(func(v []byte) error { - crea := streams.NewActivityStreamsCreate() - obj := streams.NewActivityStreamsObjectProperty() - crea.SetActivityStreamsObject(obj) - - note := streams.NewActivityStreamsNote() - contentProp := streams.NewActivityStreamsContentProperty() - contentProp.AppendXMLSchemaString(string(v)) - note.SetActivityStreamsContent(contentProp) - obj.AppendActivityStreamsNote(note) - - itemsProp.AppendActivityStreamsCreate(crea) - return nil - }) - if err != nil { - return err - } - } - return nil - }) - if err != nil { - return nil, err - } + // err = db.View(func(txn *badger.Txn) error { + // opts := badger.DefaultIteratorOptions + // opts.PrefetchValues = false + // it := txn.NewIterator(opts) + // defer it.Close() + // prefix := []byte("outbox:sample") // TODO + // for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + // item := it.Item() + // err := item.Value(func(v []byte) error { + // crea := streams.NewActivityStreamsCreate() + // obj := streams.NewActivityStreamsObjectProperty() + // crea.SetActivityStreamsObject(obj) + + // note := streams.NewActivityStreamsNote() + // contentProp := streams.NewActivityStreamsContentProperty() + // contentProp.AppendXMLSchemaString(string(v)) + // note.SetActivityStreamsContent(contentProp) + // obj.AppendActivityStreamsNote(note) + + // itemsProp.AppendActivityStreamsCreate(crea) + // return nil + // }) + // if err != nil { + // return err + // } + // } + // return nil + // }) + // if err != nil { + // return nil, err + // } oc.SetActivityStreamsOrderedItems(itemsProp) return streams.Serialize(oc) } -func RenderOutbox(name, domain string, db *badger.DB) (map[string]interface{}, error) { +func RenderOutbox(name, domain string, totalItems int) (map[string]interface{}, error) { id, err := urls.UrlOutbox(name, domain) if err != nil { @@ -83,24 +82,8 @@ func RenderOutbox(name, domain string, db *badger.DB) (map[string]interface{}, e idProp.Set(id) oc.SetJSONLDId(idProp) - var i int - // err = db.View(func(txn *badger.Txn) error { - // opts := badger.DefaultIteratorOptions - // opts.PrefetchValues = false - // it := txn.NewIterator(opts) - // defer it.Close() - // prefix := []byte("outbox:sample") // TODO - // for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { - // i++ - // } - // return nil - // }) - // if err != nil { - // return nil, err - // } - itemsProp := streams.NewActivityStreamsTotalItemsProperty() - itemsProp.Set(i) + itemsProp.Set(totalItems) oc.SetActivityStreamsTotalItems(itemsProp) ocProp := streams.NewActivityStreamsFirstProperty() diff --git a/resources/profile.go b/views/profile.go index 971b11c..12aae54 100644 --- a/resources/profile.go +++ b/views/profile.go @@ -1,4 +1,4 @@ -package resources +package views import ( "git.capotej.com/capotej/communique/urls" diff --git a/resources/webfinger.go b/views/webfinger.go index e7c47ea..4252bae 100644 --- a/resources/webfinger.go +++ b/views/webfinger.go @@ -1,4 +1,4 @@ -package resources +package views import ( "fmt" |