aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cgi/servers.go48
-rw-r--r--go.mod1
-rw-r--r--go.sum2
-rw-r--r--http/server.go17
-rw-r--r--main.go6
-rw-r--r--resources/outbox.go28
-rw-r--r--sample-config.toml3
7 files changed, 56 insertions, 49 deletions
diff --git a/cgi/servers.go b/cgi/servers.go
index e0917f1..229bca2 100644
--- a/cgi/servers.go
+++ b/cgi/servers.go
@@ -13,19 +13,19 @@ import (
"time"
"git.capotej.com/capotej/communique/config"
- "github.com/dgraph-io/badger/v3"
+ "git.capotej.com/capotej/communique/models"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"go.uber.org/zap"
)
type Servers struct {
- log *zap.SugaredLogger
- db *badger.DB
+ log *zap.SugaredLogger
+ persister *models.Persister
}
-func NewServers(log *zap.SugaredLogger, db *badger.DB) *Servers {
- return &Servers{log: log, db: db}
+func NewServers(log *zap.SugaredLogger, persister *models.Persister) *Servers {
+ return &Servers{log: log, persister: persister}
}
// Start iterates over all Handlers and starts an internal CGI server for each one
@@ -41,7 +41,7 @@ func (s *Servers) Start(cfg config.Config) {
}(handler)
go func(aHandler config.Handler) {
defer wg.Done()
- startTicker(aHandler, s.db, s.log)
+ startTicker(aHandler, s.persister, s.log)
}(handler)
}
wg.Wait()
@@ -63,7 +63,7 @@ func startCGIServer(h config.Handler) {
server.Serve(unixListener)
}
-func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) {
+func startTicker(h config.Handler, persister *models.Persister, log *zap.SugaredLogger) {
ticker := time.NewTicker(h.Interval)
done := make(chan bool)
func() {
@@ -73,7 +73,7 @@ func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) {
return
case _ = <-ticker.C:
output := tick(h)
- err := processTick(h, output, db, log)
+ err := processTick(h, output, persister, log)
if err != nil {
log.Error(err)
}
@@ -82,7 +82,7 @@ func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) {
}()
}
-func processTick(h config.Handler, output []byte, db *badger.DB, log *zap.SugaredLogger) error {
+func processTick(h config.Handler, output []byte, persister *models.Persister, log *zap.SugaredLogger) error {
var m map[string]interface{}
err := json.Unmarshal(output, &m)
if err != nil {
@@ -101,35 +101,17 @@ func processTick(h config.Handler, output []byte, db *badger.DB, log *zap.Sugare
if err != nil {
return fmt.Errorf("could not resolve JSON: %w", err)
}
- keyBase := fmt.Sprintf("outbox:%s", h.Name)
- seq, err := db.GetSequence([]byte(keyBase), 10)
- defer seq.Release()
orderedProp := coll.GetActivityStreamsOrderedItems()
for iter := orderedProp.Begin(); iter != orderedProp.End(); iter = iter.Next() {
contentType := iter.GetType().GetTypeName()
log.Debugf("found object type %s in activity stream", contentType)
- contentNote := iter.GetActivityStreamsNote()
- if contentNote.GetActivityStreamsContent() != nil {
- content := contentNote.GetActivityStreamsContent()
- var contentVal string
- for contentIter := content.Begin(); contentIter != content.End(); contentIter = contentIter.Next() {
- contentVal = contentIter.GetXMLSchemaString()
- log.Debugf("contentVal is %+v", contentVal)
- autoinc, err := seq.Next()
- if err != nil {
- return fmt.Errorf("could not auto inc: %w", err)
- }
- keyName := fmt.Sprintf("%s:%d", keyBase, autoinc)
- err = db.Update(func(txn *badger.Txn) error {
- e := badger.NewEntry([]byte(keyName), []byte(contentVal))
- log.With("name", "ticker").Infof("writing '%s' to %s", contentVal, keyName)
- return txn.SetEntry(e)
- })
- if err != nil {
- return fmt.Errorf("could insert into outbox: %w", err)
- }
- }
+ contentNote := iter.GetActivityStreamsNote() // TODO make this configurable since it cant be dynamic
+
+ localNote := models.NewActivityStreamsObject(contentNote, h)
+ err = persister.Store(localNote)
+ if err != nil {
+ return err
}
}
return nil
diff --git a/go.mod b/go.mod
index 0272442..f0c839d 100644
--- a/go.mod
+++ b/go.mod
@@ -31,6 +31,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
+ github.com/segmentio/ksuid v1.0.4 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
diff --git a/go.sum b/go.sum
index b281389..983bb88 100644
--- a/go.sum
+++ b/go.sum
@@ -128,6 +128,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
+github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
+github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
diff --git a/http/server.go b/http/server.go
index b3d3c9b..b02e5c9 100644
--- a/http/server.go
+++ b/http/server.go
@@ -76,5 +76,22 @@ func (s *Server) Start(zapWriter io.Writer) {
}
})
+ // // outbox single
+ // router.GET("/actors/:actor/outbox/:id", func(c *gin.Context) {
+ // actorParam := c.Param("actor")
+ // var resource map[string]interface{}
+ // if c.Query("page") == "true" {
+ // resource, _ = s.registry.OutboxPage(actorParam)
+ // } else {
+ // resource, _ = s.registry.Outbox(actorParam)
+ // }
+ // if resource != nil {
+ // c.Writer.Header().Set("Content-Type", "application/activity+json")
+ // c.JSON(http.StatusOK, resource)
+ // } else {
+ // c.JSON(http.StatusNotFound, nil)
+ // }
+ // })
+
router.Run()
}
diff --git a/main.go b/main.go
index aa92f9f..064ba93 100644
--- a/main.go
+++ b/main.go
@@ -6,6 +6,7 @@ import (
"git.capotej.com/capotej/communique/cgi"
"git.capotej.com/capotej/communique/config"
"git.capotej.com/capotej/communique/http"
+ "git.capotej.com/capotej/communique/models"
"git.capotej.com/capotej/communique/registry"
"github.com/BurntSushi/toml"
"github.com/dgraph-io/badger/v3"
@@ -38,6 +39,9 @@ func main() {
}
defer db.Close()
+ // Persister
+ persister := models.NewPersister(log, db)
+
// Registry
registry := registry.NewRegistry(cfg, db)
@@ -45,7 +49,7 @@ func main() {
var mainWg sync.WaitGroup
// // Internal CGI Servers
- cgiServers := cgi.NewServers(log, db)
+ cgiServers := cgi.NewServers(log, persister)
mainWg.Add(1)
go cgiServers.Start(cfg)
diff --git a/resources/outbox.go b/resources/outbox.go
index 93bed92..e7004c4 100644
--- a/resources/outbox.go
+++ b/resources/outbox.go
@@ -84,20 +84,20 @@ func RenderOutbox(name, domain string, db *badger.DB) (map[string]interface{}, e
oc.SetJSONLDId(idProp)
var i int
- err = db.View(func(txn *badger.Txn) error {
- opts := badger.DefaultIteratorOptions
- opts.PrefetchValues = false
- it := txn.NewIterator(opts)
- defer it.Close()
- prefix := []byte("outbox:sample") // TODO
- for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
- i++
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
+ // err = db.View(func(txn *badger.Txn) error {
+ // opts := badger.DefaultIteratorOptions
+ // opts.PrefetchValues = false
+ // it := txn.NewIterator(opts)
+ // defer it.Close()
+ // prefix := []byte("outbox:sample") // TODO
+ // for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
+ // i++
+ // }
+ // return nil
+ // })
+ // if err != nil {
+ // return nil, err
+ // }
itemsProp := streams.NewActivityStreamsTotalItemsProperty()
itemsProp.Set(i)
diff --git a/sample-config.toml b/sample-config.toml
index 38a71f7..a89678a 100644
--- a/sample-config.toml
+++ b/sample-config.toml
@@ -3,7 +3,8 @@ dbPath = "bub.db"
[[handlers]]
name = "sample"
-rpc = "cgi"
+rpc = "cgi" # rename to protocol?
+# add response type? like Note
exec = "sample-cgi-handler.sh"
interval = "5s"