aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--main.go1
-rw-r--r--models/persister.go10
-rw-r--r--models/subscription.go57
-rw-r--r--registry/registry.go32
4 files changed, 90 insertions, 10 deletions
diff --git a/main.go b/main.go
index e5f530a..9e3c6f3 100644
--- a/main.go
+++ b/main.go
@@ -24,6 +24,7 @@ func main() {
// Config
var cfg config.Config
//TODO use a flag here
+ //TODO ensure that handler names only contain AZa-z and '-'
_, err := toml.DecodeFile("sample-config.toml", &cfg)
if err != nil {
log.Fatal(err)
diff --git a/models/persister.go b/models/persister.go
index c6e0c72..5f79e4e 100644
--- a/models/persister.go
+++ b/models/persister.go
@@ -43,6 +43,16 @@ func (p *Persister) Store(model model) error {
return err
}
+func (p *Persister) Delete(model model) error {
+ log := p.log.With("model", model.Name()).With("DedupKey", model.DedupKey()).With("Key", model.Key())
+ log.Debug("Delete()")
+ err := p.db.Update(func(txn *badger.Txn) error {
+ txn.Delete([]byte(model.Key()))
+ return nil
+ })
+ return err
+}
+
func (p *Persister) Count(model model) (int, error) {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
diff --git a/models/subscription.go b/models/subscription.go
new file mode 100644
index 0000000..191ffc6
--- /dev/null
+++ b/models/subscription.go
@@ -0,0 +1,57 @@
+package models
+
+import (
+ "fmt"
+ "time"
+
+ "git.capotej.com/capotej/communique/config"
+ "github.com/dgraph-io/badger/v3"
+)
+
+type Subscription struct {
+ Handler config.Handler
+ InboxUrl string
+ CreatedAt time.Time
+}
+
+// used for lookup purposes (count, collect, find)
+func NewSubscription(h config.Handler) *Subscription {
+ aso := &Subscription{Handler: h}
+ return aso
+}
+
+func CreateSubscription(h config.Handler, inboxUrl string) (*Subscription, error) {
+ aso := &Subscription{
+ Handler: h,
+ InboxUrl: inboxUrl,
+ CreatedAt: time.Now(),
+ }
+ return aso, nil
+}
+
+func (a *Subscription) Name() string {
+ return "Subscription"
+}
+
+func (a *Subscription) Keybase() string {
+ keyBase := fmt.Sprintf("sub:%s", a.Handler.Name)
+ return keyBase
+}
+
+func (a *Subscription) DedupKey() string {
+ return a.Key()
+}
+
+func (a *Subscription) Key() string {
+ return fmt.Sprintf("sub:%s:%s", a.Handler.Name, a.InboxUrl)
+}
+
+func (a *Subscription) SaveDedup(txn *badger.Txn) error {
+ txn.Discard() // nothing to do here
+ return nil
+}
+
+func (a *Subscription) Save(txn *badger.Txn) error {
+ e := badger.NewEntry([]byte(a.Key()), []byte{})
+ return txn.SetEntry(e)
+}
diff --git a/registry/registry.go b/registry/registry.go
index dc5dff2..e15d861 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -210,32 +210,44 @@ func (r *Registry) Inbox(name string, req *http.Request, payload []byte) error {
idProp := person.GetJSONLDId()
idPropUrl := idProp.Get()
inboxProp := person.GetActivityStreamsInbox()
- url := inboxProp.GetIRI()
- logger.With("actor", idPropUrl).With("inbox", url).Debugf("follow")
+ inboxUrl := inboxProp.GetIRI()
+ logger.With("actor", idPropUrl).With("inbox", inboxUrl).Debugf("follow")
actorKeyUrl, err := urls.UrlProfileKey(handler.handlerCfg.Name, r.cfg.Domain)
if err != nil {
return err
}
-
- r.deliverAcceptToInbox(url, actorUrl, actorKeyUrl, follow, handler.handlerCfg)
- // r.subscribeActorToHandler()
- return nil
+ err = r.deliverAcceptToInbox(inboxUrl, actorUrl, actorKeyUrl, follow, handler.handlerCfg)
+ if err != nil {
+ return err
+ }
+ return r.subscribeActorToHandler(handler.handlerCfg, inboxUrl.String())
}, func(c context.Context, note vocab.ActivityStreamsUndo) error {
// Unfollow
idProp := person.GetJSONLDId()
idPropUrl := idProp.Get()
inboxProp := person.GetActivityStreamsInbox()
- url := inboxProp.GetIRI()
- logger.With("actor", idPropUrl).With("inbox", url).Debugf("undo")
- // r.unsubscribeActorToHandler()
- return nil
+ inboxUrl := inboxProp.GetIRI()
+ logger.With("actor", idPropUrl).With("inbox", inboxUrl).Debugf("unfollow/undo")
+ return r.unsubscribeActorToHandler(handler.handlerCfg, inboxUrl.String())
})
err = resolver.Resolve(ctx, followData)
return err
}
+func (r *Registry) subscribeActorToHandler(handler config.Handler, inboxUrl string) error {
+ aso, err := models.CreateSubscription(handler, inboxUrl)
+ r.persister.Store(aso)
+ return err
+}
+
+func (r *Registry) unsubscribeActorToHandler(handler config.Handler, inboxUrl string) error {
+ aso, err := models.CreateSubscription(handler, inboxUrl)
+ r.persister.Delete(aso)
+ return err
+}
+// TODO should probably be in its own delivery package
func (r *Registry) deliverAcceptToInbox(url, actorUrl, actorKeyUrl *url.URL, follow vocab.ActivityStreamsFollow, handler config.Handler) error {
accept := streams.NewActivityStreamsAccept()
actorProp := streams.NewActivityStreamsActorProperty()