diff options
Diffstat (limited to '')
-rw-r--r-- | cgi/servers.go | 48 |
1 files changed, 15 insertions, 33 deletions
diff --git a/cgi/servers.go b/cgi/servers.go index e0917f1..229bca2 100644 --- a/cgi/servers.go +++ b/cgi/servers.go @@ -13,19 +13,19 @@ import ( "time" "git.capotej.com/capotej/communique/config" - "github.com/dgraph-io/badger/v3" + "git.capotej.com/capotej/communique/models" "github.com/go-fed/activity/streams" "github.com/go-fed/activity/streams/vocab" "go.uber.org/zap" ) type Servers struct { - log *zap.SugaredLogger - db *badger.DB + log *zap.SugaredLogger + persister *models.Persister } -func NewServers(log *zap.SugaredLogger, db *badger.DB) *Servers { - return &Servers{log: log, db: db} +func NewServers(log *zap.SugaredLogger, persister *models.Persister) *Servers { + return &Servers{log: log, persister: persister} } // Start iterates over all Handlers and starts an internal CGI server for each one @@ -41,7 +41,7 @@ func (s *Servers) Start(cfg config.Config) { }(handler) go func(aHandler config.Handler) { defer wg.Done() - startTicker(aHandler, s.db, s.log) + startTicker(aHandler, s.persister, s.log) }(handler) } wg.Wait() @@ -63,7 +63,7 @@ func startCGIServer(h config.Handler) { server.Serve(unixListener) } -func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) { +func startTicker(h config.Handler, persister *models.Persister, log *zap.SugaredLogger) { ticker := time.NewTicker(h.Interval) done := make(chan bool) func() { @@ -73,7 +73,7 @@ func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) { return case _ = <-ticker.C: output := tick(h) - err := processTick(h, output, db, log) + err := processTick(h, output, persister, log) if err != nil { log.Error(err) } @@ -82,7 +82,7 @@ func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) { }() } -func processTick(h config.Handler, output []byte, db *badger.DB, log *zap.SugaredLogger) error { +func processTick(h config.Handler, output []byte, persister *models.Persister, log *zap.SugaredLogger) error { var m map[string]interface{} err := json.Unmarshal(output, &m) if err != nil { @@ -101,35 +101,17 @@ func processTick(h config.Handler, output []byte, db *badger.DB, log *zap.Sugare if err != nil { return fmt.Errorf("could not resolve JSON: %w", err) } - keyBase := fmt.Sprintf("outbox:%s", h.Name) - seq, err := db.GetSequence([]byte(keyBase), 10) - defer seq.Release() orderedProp := coll.GetActivityStreamsOrderedItems() for iter := orderedProp.Begin(); iter != orderedProp.End(); iter = iter.Next() { contentType := iter.GetType().GetTypeName() log.Debugf("found object type %s in activity stream", contentType) - contentNote := iter.GetActivityStreamsNote() - if contentNote.GetActivityStreamsContent() != nil { - content := contentNote.GetActivityStreamsContent() - var contentVal string - for contentIter := content.Begin(); contentIter != content.End(); contentIter = contentIter.Next() { - contentVal = contentIter.GetXMLSchemaString() - log.Debugf("contentVal is %+v", contentVal) - autoinc, err := seq.Next() - if err != nil { - return fmt.Errorf("could not auto inc: %w", err) - } - keyName := fmt.Sprintf("%s:%d", keyBase, autoinc) - err = db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry([]byte(keyName), []byte(contentVal)) - log.With("name", "ticker").Infof("writing '%s' to %s", contentVal, keyName) - return txn.SetEntry(e) - }) - if err != nil { - return fmt.Errorf("could insert into outbox: %w", err) - } - } + contentNote := iter.GetActivityStreamsNote() // TODO make this configurable since it cant be dynamic + + localNote := models.NewActivityStreamsObject(contentNote, h) + err = persister.Store(localNote) + if err != nil { + return err } } return nil |