From a4288b06bf13210721c8f2fae64bc12c118f9041 Mon Sep 17 00:00:00 2001 From: Julio Capote Date: Thu, 29 Dec 2022 15:08:16 -0500 Subject: refactor --- main.go | 2 +- models/activity_streams_object.go | 53 ++++++++++++++++++ models/dedup_item.go | 14 +++++ models/model.go | 8 +++ models/persister.go | 37 +++++++++++++ registry/registry.go | 25 +++++---- resources/outbox.go | 111 -------------------------------------- resources/profile.go | 21 -------- resources/webfinger.go | 32 ----------- views/outbox.go | 94 ++++++++++++++++++++++++++++++++ views/profile.go | 21 ++++++++ views/webfinger.go | 32 +++++++++++ 12 files changed, 275 insertions(+), 175 deletions(-) create mode 100644 models/activity_streams_object.go create mode 100644 models/dedup_item.go create mode 100644 models/model.go create mode 100644 models/persister.go delete mode 100644 resources/outbox.go delete mode 100644 resources/profile.go delete mode 100644 resources/webfinger.go create mode 100644 views/outbox.go create mode 100644 views/profile.go create mode 100644 views/webfinger.go diff --git a/main.go b/main.go index 064ba93..fa81264 100644 --- a/main.go +++ b/main.go @@ -43,7 +43,7 @@ func main() { persister := models.NewPersister(log, db) // Registry - registry := registry.NewRegistry(cfg, db) + registry := registry.NewRegistry(cfg, persister) // Servers var mainWg sync.WaitGroup diff --git a/models/activity_streams_object.go b/models/activity_streams_object.go new file mode 100644 index 0000000..85a7d09 --- /dev/null +++ b/models/activity_streams_object.go @@ -0,0 +1,53 @@ +package models + +import ( + "fmt" + "time" + + "git.capotej.com/capotej/communique/config" + "github.com/dgraph-io/badger/v3" + "github.com/go-fed/activity/streams/vocab" + "github.com/segmentio/ksuid" +) + +// ActivityStreamsObject is our internal model for storing activity streams objects +// For every object we receive, we create 2+N copies of it in Badger: +// - the original object +// - a copy for dedupe with an optional TTL (specified by handler response) TODO +// - a temporary copy for every subscriber until it is delivered TODO +type ActivityStreamsObject struct { + wrappedObject vocab.ActivityStreamsNote // OPTIONAL, not always wrapped (think .count) + handler config.Handler +} + +func NewActivityStreamsObject(obj vocab.ActivityStreamsNote, h config.Handler) *ActivityStreamsObject { + aso := &ActivityStreamsObject{wrappedObject: obj, handler: h} + return aso +} + +func (a *ActivityStreamsObject) keyName() []byte { + k, _ := ksuid.NewRandomWithTime(time.Now()) + key := fmt.Sprintf("%s:%s", a.Keybase(), k.String()) + return []byte(key) +} + +func (a *ActivityStreamsObject) Keybase() string { + keyBase := fmt.Sprintf("outboxes:%s", a.handler.Name) + return keyBase +} + +func (a *ActivityStreamsObject) content() []byte { + var contentVal string + if a.wrappedObject.GetActivityStreamsContent() != nil { + content := a.wrappedObject.GetActivityStreamsContent() + for contentIter := content.Begin(); contentIter != content.End(); contentIter = contentIter.Next() { + contentVal = contentIter.GetXMLSchemaString() + } + } + return []byte(contentVal) +} + +func (a *ActivityStreamsObject) Save(txn *badger.Txn) error { + e := badger.NewEntry(a.keyName(), a.content()) + return txn.SetEntry(e) +} diff --git a/models/dedup_item.go b/models/dedup_item.go new file mode 100644 index 0000000..b079181 --- /dev/null +++ b/models/dedup_item.go @@ -0,0 +1,14 @@ +package models + +// func NewActivityStreamsObject(obj vocab.ActivityStreamsObject) *ActivityStreamsObject { +// aso := &ActivityStreamsObject{wrappedObject: obj} +// return aso +// } + +// func (a *ActivityStreamsObject) Id() []byte { + +// } + +// func (a *ActivityStreamsObject) Value() []byte { + +// } diff --git a/models/model.go b/models/model.go new file mode 100644 index 0000000..56f1680 --- /dev/null +++ b/models/model.go @@ -0,0 +1,8 @@ +package models + +import "github.com/dgraph-io/badger/v3" + +type model interface { + Save(txn *badger.Txn) error + Keybase() string +} diff --git a/models/persister.go b/models/persister.go new file mode 100644 index 0000000..71cddc8 --- /dev/null +++ b/models/persister.go @@ -0,0 +1,37 @@ +package models + +import ( + "github.com/dgraph-io/badger/v3" + "go.uber.org/zap" +) + +type Persister struct { + log *zap.SugaredLogger + db *badger.DB +} + +func NewPersister(log *zap.SugaredLogger, db *badger.DB) *Persister { + aso := &Persister{log: log, db: db} + return aso +} + +func (p *Persister) Store(model model) error { + err := p.db.Update(model.Save) + return err +} + +func (p *Persister) Count(model model) (int, error) { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + var count int + err := p.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(opts) + defer it.Close() + prefix := []byte(model.Keybase()) + for it.Seek(prefix); it.ValidForPrefix([]byte(prefix)); it.Next() { + count++ + } + return nil + }) + return count, err +} diff --git a/registry/registry.go b/registry/registry.go index 4dbacc7..f57aede 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -4,8 +4,8 @@ import ( "fmt" "git.capotej.com/capotej/communique/config" - "git.capotej.com/capotej/communique/resources" - "github.com/dgraph-io/badger/v3" + "git.capotej.com/capotej/communique/models" + "git.capotej.com/capotej/communique/views" ) type Handler struct { @@ -14,12 +14,12 @@ type Handler struct { type Registry struct { cfg config.Config - db *badger.DB + persister *models.Persister handlerMap map[string]Handler } -func NewRegistry(cfg config.Config, db *badger.DB) *Registry { - reg := Registry{cfg: cfg, db: db} +func NewRegistry(cfg config.Config, persister *models.Persister) *Registry { + reg := Registry{cfg: cfg, persister: persister} reg.handlerMap = make(map[string]Handler) for _, v := range cfg.Handlers { reg.handlerMap[fqn(v.Name, cfg.Domain)] = Handler{handlerCfg: v} @@ -32,7 +32,7 @@ func (r *Registry) Profile(name string) (map[string]interface{}, error) { if handler == nil { return nil, nil } - return resources.RenderProfile(handler.handlerCfg.Name, r.cfg.Domain) + return views.RenderProfile(handler.handlerCfg.Name, r.cfg.Domain) } func (r *Registry) Outbox(name string) (map[string]interface{}, error) { @@ -40,7 +40,12 @@ func (r *Registry) Outbox(name string) (map[string]interface{}, error) { if handler == nil { return nil, nil } - return resources.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain, r.db) + aso := models.NewActivityStreamsObject(nil, handler.handlerCfg) + totalItems, err := r.persister.Count(aso) + if err != nil { + return nil, err + } + return views.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain, totalItems) } func (r *Registry) OutboxPage(name string) (map[string]interface{}, error) { @@ -48,10 +53,10 @@ func (r *Registry) OutboxPage(name string) (map[string]interface{}, error) { if handler == nil { return nil, nil } - return resources.RenderOutboxPage(handler.handlerCfg.Name, r.cfg.Domain, r.db) + return views.RenderOutboxPage(handler.handlerCfg.Name, r.cfg.Domain) } -func (r *Registry) WebfingerResource(fqn string) (*resources.WebfingerResource, error) { +func (r *Registry) WebfingerResource(fqn string) (*views.WebfingerResource, error) { handler := r.findByFQN(fqn) if handler == nil { handler = r.findByFQN("acct:" + fqn) @@ -59,7 +64,7 @@ func (r *Registry) WebfingerResource(fqn string) (*resources.WebfingerResource, if handler == nil { return nil, nil } - return resources.RenderWebfingerResource(handler.handlerCfg.Name, r.cfg.Domain) + return views.RenderWebfingerResource(handler.handlerCfg.Name, r.cfg.Domain) } func fqn(name, domain string) string { diff --git a/resources/outbox.go b/resources/outbox.go deleted file mode 100644 index e7004c4..0000000 --- a/resources/outbox.go +++ /dev/null @@ -1,111 +0,0 @@ -package resources - -import ( - "git.capotej.com/capotej/communique/urls" - "github.com/dgraph-io/badger/v3" - "github.com/go-fed/activity/streams" -) - -func RenderOutboxPage(name, domain string, db *badger.DB) (map[string]interface{}, error) { - id, err := urls.UrlOutboxPage(name, domain) - if err != nil { - return nil, err - } - - partOf, err := urls.UrlOutbox(name, domain) - if err != nil { - return nil, err - } - - oc := streams.NewActivityStreamsOrderedCollectionPage() - - idProp := streams.NewJSONLDIdProperty() - idProp.Set(id) - oc.SetJSONLDId(idProp) - - partOfProp := streams.NewActivityStreamsPartOfProperty() - partOfProp.SetIRI(partOf) - oc.SetActivityStreamsPartOf(partOfProp) - - itemsProp := streams.NewActivityStreamsOrderedItemsProperty() - - err = db.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - it := txn.NewIterator(opts) - defer it.Close() - prefix := []byte("outbox:sample") // TODO - for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { - item := it.Item() - err := item.Value(func(v []byte) error { - crea := streams.NewActivityStreamsCreate() - obj := streams.NewActivityStreamsObjectProperty() - crea.SetActivityStreamsObject(obj) - - note := streams.NewActivityStreamsNote() - contentProp := streams.NewActivityStreamsContentProperty() - contentProp.AppendXMLSchemaString(string(v)) - note.SetActivityStreamsContent(contentProp) - obj.AppendActivityStreamsNote(note) - - itemsProp.AppendActivityStreamsCreate(crea) - return nil - }) - if err != nil { - return err - } - } - return nil - }) - if err != nil { - return nil, err - } - - oc.SetActivityStreamsOrderedItems(itemsProp) - return streams.Serialize(oc) -} - -func RenderOutbox(name, domain string, db *badger.DB) (map[string]interface{}, error) { - id, err := urls.UrlOutbox(name, domain) - - if err != nil { - return nil, err - } - - first, err := urls.UrlOutboxPage(name, domain) - if err != nil { - return nil, err - } - - oc := streams.NewActivityStreamsOrderedCollection() - - idProp := streams.NewJSONLDIdProperty() - idProp.Set(id) - oc.SetJSONLDId(idProp) - - var i int - // err = db.View(func(txn *badger.Txn) error { - // opts := badger.DefaultIteratorOptions - // opts.PrefetchValues = false - // it := txn.NewIterator(opts) - // defer it.Close() - // prefix := []byte("outbox:sample") // TODO - // for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { - // i++ - // } - // return nil - // }) - // if err != nil { - // return nil, err - // } - - itemsProp := streams.NewActivityStreamsTotalItemsProperty() - itemsProp.Set(i) - oc.SetActivityStreamsTotalItems(itemsProp) - - ocProp := streams.NewActivityStreamsFirstProperty() - ocProp.SetIRI(first) - oc.SetActivityStreamsFirst(ocProp) - - return streams.Serialize(oc) -} diff --git a/resources/profile.go b/resources/profile.go deleted file mode 100644 index 971b11c..0000000 --- a/resources/profile.go +++ /dev/null @@ -1,21 +0,0 @@ -package resources - -import ( - "git.capotej.com/capotej/communique/urls" - "github.com/go-fed/activity/streams" -) - -func RenderProfile(name, domain string) (map[string]interface{}, error) { - u, err := urls.UrlInbox(name, domain) - if err != nil { - return nil, err - } - - inb := streams.NewActivityStreamsInboxProperty() - inb.SetIRI(u) - - p := streams.NewActivityStreamsService() - p.SetActivityStreamsInbox(inb) - - return streams.Serialize(p) -} diff --git a/resources/webfinger.go b/resources/webfinger.go deleted file mode 100644 index e7c47ea..0000000 --- a/resources/webfinger.go +++ /dev/null @@ -1,32 +0,0 @@ -package resources - -import ( - "fmt" - "path" -) - -type Link struct { - Rel string `json:"rel"` - Type string `json:"type"` - Href string `json:"href"` -} - -type WebfingerResource struct { - Subject string `json:"subject"` - Aliases []string `json:"aliases"` - Links []Link `json:"links"` -} - -func RenderWebfingerResource(name, domain string) (*WebfingerResource, error) { - rs := WebfingerResource{ - // TODO clean up - Subject: fmt.Sprintf("acct:%s@%s", name, domain), - Aliases: []string{}, - Links: []Link{{ - Rel: "self", - Href: path.Join("https://", domain, "actors", name), - Type: "application/activity+json", - }}, - } - return &rs, nil -} diff --git a/views/outbox.go b/views/outbox.go new file mode 100644 index 0000000..fe87e24 --- /dev/null +++ b/views/outbox.go @@ -0,0 +1,94 @@ +package views + +import ( + "git.capotej.com/capotej/communique/urls" + "github.com/go-fed/activity/streams" +) + +func RenderOutboxPage(name, domain string) (map[string]interface{}, error) { + id, err := urls.UrlOutboxPage(name, domain) + if err != nil { + return nil, err + } + + partOf, err := urls.UrlOutbox(name, domain) + if err != nil { + return nil, err + } + + oc := streams.NewActivityStreamsOrderedCollectionPage() + + idProp := streams.NewJSONLDIdProperty() + idProp.Set(id) + oc.SetJSONLDId(idProp) + + partOfProp := streams.NewActivityStreamsPartOfProperty() + partOfProp.SetIRI(partOf) + oc.SetActivityStreamsPartOf(partOfProp) + + itemsProp := streams.NewActivityStreamsOrderedItemsProperty() + + // err = db.View(func(txn *badger.Txn) error { + // opts := badger.DefaultIteratorOptions + // opts.PrefetchValues = false + // it := txn.NewIterator(opts) + // defer it.Close() + // prefix := []byte("outbox:sample") // TODO + // for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + // item := it.Item() + // err := item.Value(func(v []byte) error { + // crea := streams.NewActivityStreamsCreate() + // obj := streams.NewActivityStreamsObjectProperty() + // crea.SetActivityStreamsObject(obj) + + // note := streams.NewActivityStreamsNote() + // contentProp := streams.NewActivityStreamsContentProperty() + // contentProp.AppendXMLSchemaString(string(v)) + // note.SetActivityStreamsContent(contentProp) + // obj.AppendActivityStreamsNote(note) + + // itemsProp.AppendActivityStreamsCreate(crea) + // return nil + // }) + // if err != nil { + // return err + // } + // } + // return nil + // }) + // if err != nil { + // return nil, err + // } + + oc.SetActivityStreamsOrderedItems(itemsProp) + return streams.Serialize(oc) +} + +func RenderOutbox(name, domain string, totalItems int) (map[string]interface{}, error) { + id, err := urls.UrlOutbox(name, domain) + + if err != nil { + return nil, err + } + + first, err := urls.UrlOutboxPage(name, domain) + if err != nil { + return nil, err + } + + oc := streams.NewActivityStreamsOrderedCollection() + + idProp := streams.NewJSONLDIdProperty() + idProp.Set(id) + oc.SetJSONLDId(idProp) + + itemsProp := streams.NewActivityStreamsTotalItemsProperty() + itemsProp.Set(totalItems) + oc.SetActivityStreamsTotalItems(itemsProp) + + ocProp := streams.NewActivityStreamsFirstProperty() + ocProp.SetIRI(first) + oc.SetActivityStreamsFirst(ocProp) + + return streams.Serialize(oc) +} diff --git a/views/profile.go b/views/profile.go new file mode 100644 index 0000000..12aae54 --- /dev/null +++ b/views/profile.go @@ -0,0 +1,21 @@ +package views + +import ( + "git.capotej.com/capotej/communique/urls" + "github.com/go-fed/activity/streams" +) + +func RenderProfile(name, domain string) (map[string]interface{}, error) { + u, err := urls.UrlInbox(name, domain) + if err != nil { + return nil, err + } + + inb := streams.NewActivityStreamsInboxProperty() + inb.SetIRI(u) + + p := streams.NewActivityStreamsService() + p.SetActivityStreamsInbox(inb) + + return streams.Serialize(p) +} diff --git a/views/webfinger.go b/views/webfinger.go new file mode 100644 index 0000000..4252bae --- /dev/null +++ b/views/webfinger.go @@ -0,0 +1,32 @@ +package views + +import ( + "fmt" + "path" +) + +type Link struct { + Rel string `json:"rel"` + Type string `json:"type"` + Href string `json:"href"` +} + +type WebfingerResource struct { + Subject string `json:"subject"` + Aliases []string `json:"aliases"` + Links []Link `json:"links"` +} + +func RenderWebfingerResource(name, domain string) (*WebfingerResource, error) { + rs := WebfingerResource{ + // TODO clean up + Subject: fmt.Sprintf("acct:%s@%s", name, domain), + Aliases: []string{}, + Links: []Link{{ + Rel: "self", + Href: path.Join("https://", domain, "actors", name), + Type: "application/activity+json", + }}, + } + return &rs, nil +} -- cgit v1.2.3