From ca12c0d26c987e67deade02bdf645fda8af30016 Mon Sep 17 00:00:00 2001 From: Julio Capote Date: Fri, 6 Jan 2023 13:48:12 -0500 Subject: start of subscriptions --- main.go | 1 + models/persister.go | 10 +++++++++ models/subscription.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++ registry/registry.go | 32 +++++++++++++++++++--------- 4 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 models/subscription.go diff --git a/main.go b/main.go index e5f530a..9e3c6f3 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ func main() { // Config var cfg config.Config //TODO use a flag here + //TODO ensure that handler names only contain AZa-z and '-' _, err := toml.DecodeFile("sample-config.toml", &cfg) if err != nil { log.Fatal(err) diff --git a/models/persister.go b/models/persister.go index c6e0c72..5f79e4e 100644 --- a/models/persister.go +++ b/models/persister.go @@ -43,6 +43,16 @@ func (p *Persister) Store(model model) error { return err } +func (p *Persister) Delete(model model) error { + log := p.log.With("model", model.Name()).With("DedupKey", model.DedupKey()).With("Key", model.Key()) + log.Debug("Delete()") + err := p.db.Update(func(txn *badger.Txn) error { + txn.Delete([]byte(model.Key())) + return nil + }) + return err +} + func (p *Persister) Count(model model) (int, error) { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false diff --git a/models/subscription.go b/models/subscription.go new file mode 100644 index 0000000..191ffc6 --- /dev/null +++ b/models/subscription.go @@ -0,0 +1,57 @@ +package models + +import ( + "fmt" + "time" + + "git.capotej.com/capotej/communique/config" + "github.com/dgraph-io/badger/v3" +) + +type Subscription struct { + Handler config.Handler + InboxUrl string + CreatedAt time.Time +} + +// used for lookup purposes (count, collect, find) +func NewSubscription(h config.Handler) *Subscription { + aso := &Subscription{Handler: h} + return aso +} + +func CreateSubscription(h config.Handler, inboxUrl string) (*Subscription, error) { + aso := &Subscription{ + Handler: h, + InboxUrl: inboxUrl, + CreatedAt: time.Now(), + } + return aso, nil +} + +func (a *Subscription) Name() string { + return "Subscription" +} + +func (a *Subscription) Keybase() string { + keyBase := fmt.Sprintf("sub:%s", a.Handler.Name) + return keyBase +} + +func (a *Subscription) DedupKey() string { + return a.Key() +} + +func (a *Subscription) Key() string { + return fmt.Sprintf("sub:%s:%s", a.Handler.Name, a.InboxUrl) +} + +func (a *Subscription) SaveDedup(txn *badger.Txn) error { + txn.Discard() // nothing to do here + return nil +} + +func (a *Subscription) Save(txn *badger.Txn) error { + e := badger.NewEntry([]byte(a.Key()), []byte{}) + return txn.SetEntry(e) +} diff --git a/registry/registry.go b/registry/registry.go index dc5dff2..e15d861 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -210,32 +210,44 @@ func (r *Registry) Inbox(name string, req *http.Request, payload []byte) error { idProp := person.GetJSONLDId() idPropUrl := idProp.Get() inboxProp := person.GetActivityStreamsInbox() - url := inboxProp.GetIRI() - logger.With("actor", idPropUrl).With("inbox", url).Debugf("follow") + inboxUrl := inboxProp.GetIRI() + logger.With("actor", idPropUrl).With("inbox", inboxUrl).Debugf("follow") actorKeyUrl, err := urls.UrlProfileKey(handler.handlerCfg.Name, r.cfg.Domain) if err != nil { return err } - - r.deliverAcceptToInbox(url, actorUrl, actorKeyUrl, follow, handler.handlerCfg) - // r.subscribeActorToHandler() - return nil + err = r.deliverAcceptToInbox(inboxUrl, actorUrl, actorKeyUrl, follow, handler.handlerCfg) + if err != nil { + return err + } + return r.subscribeActorToHandler(handler.handlerCfg, inboxUrl.String()) }, func(c context.Context, note vocab.ActivityStreamsUndo) error { // Unfollow idProp := person.GetJSONLDId() idPropUrl := idProp.Get() inboxProp := person.GetActivityStreamsInbox() - url := inboxProp.GetIRI() - logger.With("actor", idPropUrl).With("inbox", url).Debugf("undo") - // r.unsubscribeActorToHandler() - return nil + inboxUrl := inboxProp.GetIRI() + logger.With("actor", idPropUrl).With("inbox", inboxUrl).Debugf("unfollow/undo") + return r.unsubscribeActorToHandler(handler.handlerCfg, inboxUrl.String()) }) err = resolver.Resolve(ctx, followData) return err } +func (r *Registry) subscribeActorToHandler(handler config.Handler, inboxUrl string) error { + aso, err := models.CreateSubscription(handler, inboxUrl) + r.persister.Store(aso) + return err +} + +func (r *Registry) unsubscribeActorToHandler(handler config.Handler, inboxUrl string) error { + aso, err := models.CreateSubscription(handler, inboxUrl) + r.persister.Delete(aso) + return err +} +// TODO should probably be in its own delivery package func (r *Registry) deliverAcceptToInbox(url, actorUrl, actorKeyUrl *url.URL, follow vocab.ActivityStreamsFollow, handler config.Handler) error { accept := streams.NewActivityStreamsAccept() actorProp := streams.NewActivityStreamsActorProperty() -- cgit v1.2.3