aboutsummaryrefslogtreecommitdiff
path: root/cgi/servers.go
diff options
context:
space:
mode:
Diffstat (limited to 'cgi/servers.go')
-rw-r--r--cgi/servers.go48
1 files changed, 15 insertions, 33 deletions
diff --git a/cgi/servers.go b/cgi/servers.go
index e0917f1..229bca2 100644
--- a/cgi/servers.go
+++ b/cgi/servers.go
@@ -13,19 +13,19 @@ import (
"time"
"git.capotej.com/capotej/communique/config"
- "github.com/dgraph-io/badger/v3"
+ "git.capotej.com/capotej/communique/models"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"go.uber.org/zap"
)
type Servers struct {
- log *zap.SugaredLogger
- db *badger.DB
+ log *zap.SugaredLogger
+ persister *models.Persister
}
-func NewServers(log *zap.SugaredLogger, db *badger.DB) *Servers {
- return &Servers{log: log, db: db}
+func NewServers(log *zap.SugaredLogger, persister *models.Persister) *Servers {
+ return &Servers{log: log, persister: persister}
}
// Start iterates over all Handlers and starts an internal CGI server for each one
@@ -41,7 +41,7 @@ func (s *Servers) Start(cfg config.Config) {
}(handler)
go func(aHandler config.Handler) {
defer wg.Done()
- startTicker(aHandler, s.db, s.log)
+ startTicker(aHandler, s.persister, s.log)
}(handler)
}
wg.Wait()
@@ -63,7 +63,7 @@ func startCGIServer(h config.Handler) {
server.Serve(unixListener)
}
-func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) {
+func startTicker(h config.Handler, persister *models.Persister, log *zap.SugaredLogger) {
ticker := time.NewTicker(h.Interval)
done := make(chan bool)
func() {
@@ -73,7 +73,7 @@ func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) {
return
case _ = <-ticker.C:
output := tick(h)
- err := processTick(h, output, db, log)
+ err := processTick(h, output, persister, log)
if err != nil {
log.Error(err)
}
@@ -82,7 +82,7 @@ func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) {
}()
}
-func processTick(h config.Handler, output []byte, db *badger.DB, log *zap.SugaredLogger) error {
+func processTick(h config.Handler, output []byte, persister *models.Persister, log *zap.SugaredLogger) error {
var m map[string]interface{}
err := json.Unmarshal(output, &m)
if err != nil {
@@ -101,35 +101,17 @@ func processTick(h config.Handler, output []byte, db *badger.DB, log *zap.Sugare
if err != nil {
return fmt.Errorf("could not resolve JSON: %w", err)
}
- keyBase := fmt.Sprintf("outbox:%s", h.Name)
- seq, err := db.GetSequence([]byte(keyBase), 10)
- defer seq.Release()
orderedProp := coll.GetActivityStreamsOrderedItems()
for iter := orderedProp.Begin(); iter != orderedProp.End(); iter = iter.Next() {
contentType := iter.GetType().GetTypeName()
log.Debugf("found object type %s in activity stream", contentType)
- contentNote := iter.GetActivityStreamsNote()
- if contentNote.GetActivityStreamsContent() != nil {
- content := contentNote.GetActivityStreamsContent()
- var contentVal string
- for contentIter := content.Begin(); contentIter != content.End(); contentIter = contentIter.Next() {
- contentVal = contentIter.GetXMLSchemaString()
- log.Debugf("contentVal is %+v", contentVal)
- autoinc, err := seq.Next()
- if err != nil {
- return fmt.Errorf("could not auto inc: %w", err)
- }
- keyName := fmt.Sprintf("%s:%d", keyBase, autoinc)
- err = db.Update(func(txn *badger.Txn) error {
- e := badger.NewEntry([]byte(keyName), []byte(contentVal))
- log.With("name", "ticker").Infof("writing '%s' to %s", contentVal, keyName)
- return txn.SetEntry(e)
- })
- if err != nil {
- return fmt.Errorf("could insert into outbox: %w", err)
- }
- }
+ contentNote := iter.GetActivityStreamsNote() // TODO make this configurable since it cant be dynamic
+
+ localNote := models.NewActivityStreamsObject(contentNote, h)
+ err = persister.Store(localNote)
+ if err != nil {
+ return err
}
}
return nil