diff options
-rw-r--r-- | cgi/servers.go | 48 | ||||
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 2 | ||||
-rw-r--r-- | http/server.go | 17 | ||||
-rw-r--r-- | main.go | 6 | ||||
-rw-r--r-- | resources/outbox.go | 28 | ||||
-rw-r--r-- | sample-config.toml | 3 |
7 files changed, 56 insertions, 49 deletions
diff --git a/cgi/servers.go b/cgi/servers.go index e0917f1..229bca2 100644 --- a/cgi/servers.go +++ b/cgi/servers.go @@ -13,19 +13,19 @@ import ( "time" "git.capotej.com/capotej/communique/config" - "github.com/dgraph-io/badger/v3" + "git.capotej.com/capotej/communique/models" "github.com/go-fed/activity/streams" "github.com/go-fed/activity/streams/vocab" "go.uber.org/zap" ) type Servers struct { - log *zap.SugaredLogger - db *badger.DB + log *zap.SugaredLogger + persister *models.Persister } -func NewServers(log *zap.SugaredLogger, db *badger.DB) *Servers { - return &Servers{log: log, db: db} +func NewServers(log *zap.SugaredLogger, persister *models.Persister) *Servers { + return &Servers{log: log, persister: persister} } // Start iterates over all Handlers and starts an internal CGI server for each one @@ -41,7 +41,7 @@ func (s *Servers) Start(cfg config.Config) { }(handler) go func(aHandler config.Handler) { defer wg.Done() - startTicker(aHandler, s.db, s.log) + startTicker(aHandler, s.persister, s.log) }(handler) } wg.Wait() @@ -63,7 +63,7 @@ func startCGIServer(h config.Handler) { server.Serve(unixListener) } -func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) { +func startTicker(h config.Handler, persister *models.Persister, log *zap.SugaredLogger) { ticker := time.NewTicker(h.Interval) done := make(chan bool) func() { @@ -73,7 +73,7 @@ func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) { return case _ = <-ticker.C: output := tick(h) - err := processTick(h, output, db, log) + err := processTick(h, output, persister, log) if err != nil { log.Error(err) } @@ -82,7 +82,7 @@ func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) { }() } -func processTick(h config.Handler, output []byte, db *badger.DB, log *zap.SugaredLogger) error { +func processTick(h config.Handler, output []byte, persister *models.Persister, log *zap.SugaredLogger) error { var m map[string]interface{} err := json.Unmarshal(output, &m) if err != nil { @@ -101,35 +101,17 @@ func processTick(h config.Handler, output []byte, db *badger.DB, log *zap.Sugare if err != nil { return fmt.Errorf("could not resolve JSON: %w", err) } - keyBase := fmt.Sprintf("outbox:%s", h.Name) - seq, err := db.GetSequence([]byte(keyBase), 10) - defer seq.Release() orderedProp := coll.GetActivityStreamsOrderedItems() for iter := orderedProp.Begin(); iter != orderedProp.End(); iter = iter.Next() { contentType := iter.GetType().GetTypeName() log.Debugf("found object type %s in activity stream", contentType) - contentNote := iter.GetActivityStreamsNote() - if contentNote.GetActivityStreamsContent() != nil { - content := contentNote.GetActivityStreamsContent() - var contentVal string - for contentIter := content.Begin(); contentIter != content.End(); contentIter = contentIter.Next() { - contentVal = contentIter.GetXMLSchemaString() - log.Debugf("contentVal is %+v", contentVal) - autoinc, err := seq.Next() - if err != nil { - return fmt.Errorf("could not auto inc: %w", err) - } - keyName := fmt.Sprintf("%s:%d", keyBase, autoinc) - err = db.Update(func(txn *badger.Txn) error { - e := badger.NewEntry([]byte(keyName), []byte(contentVal)) - log.With("name", "ticker").Infof("writing '%s' to %s", contentVal, keyName) - return txn.SetEntry(e) - }) - if err != nil { - return fmt.Errorf("could insert into outbox: %w", err) - } - } + contentNote := iter.GetActivityStreamsNote() // TODO make this configurable since it cant be dynamic + + localNote := models.NewActivityStreamsObject(contentNote, h) + err = persister.Store(localNote) + if err != nil { + return err } } return nil @@ -31,6 +31,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/segmentio/ksuid v1.0.4 // indirect github.com/ugorji/go/codec v1.2.7 // indirect go.opencensus.io v0.24.0 // indirect go.uber.org/atomic v1.10.0 // indirect @@ -128,6 +128,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= +github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= diff --git a/http/server.go b/http/server.go index b3d3c9b..b02e5c9 100644 --- a/http/server.go +++ b/http/server.go @@ -76,5 +76,22 @@ func (s *Server) Start(zapWriter io.Writer) { } }) + // // outbox single + // router.GET("/actors/:actor/outbox/:id", func(c *gin.Context) { + // actorParam := c.Param("actor") + // var resource map[string]interface{} + // if c.Query("page") == "true" { + // resource, _ = s.registry.OutboxPage(actorParam) + // } else { + // resource, _ = s.registry.Outbox(actorParam) + // } + // if resource != nil { + // c.Writer.Header().Set("Content-Type", "application/activity+json") + // c.JSON(http.StatusOK, resource) + // } else { + // c.JSON(http.StatusNotFound, nil) + // } + // }) + router.Run() } @@ -6,6 +6,7 @@ import ( "git.capotej.com/capotej/communique/cgi" "git.capotej.com/capotej/communique/config" "git.capotej.com/capotej/communique/http" + "git.capotej.com/capotej/communique/models" "git.capotej.com/capotej/communique/registry" "github.com/BurntSushi/toml" "github.com/dgraph-io/badger/v3" @@ -38,6 +39,9 @@ func main() { } defer db.Close() + // Persister + persister := models.NewPersister(log, db) + // Registry registry := registry.NewRegistry(cfg, db) @@ -45,7 +49,7 @@ func main() { var mainWg sync.WaitGroup // // Internal CGI Servers - cgiServers := cgi.NewServers(log, db) + cgiServers := cgi.NewServers(log, persister) mainWg.Add(1) go cgiServers.Start(cfg) diff --git a/resources/outbox.go b/resources/outbox.go index 93bed92..e7004c4 100644 --- a/resources/outbox.go +++ b/resources/outbox.go @@ -84,20 +84,20 @@ func RenderOutbox(name, domain string, db *badger.DB) (map[string]interface{}, e oc.SetJSONLDId(idProp) var i int - err = db.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchValues = false - it := txn.NewIterator(opts) - defer it.Close() - prefix := []byte("outbox:sample") // TODO - for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { - i++ - } - return nil - }) - if err != nil { - return nil, err - } + // err = db.View(func(txn *badger.Txn) error { + // opts := badger.DefaultIteratorOptions + // opts.PrefetchValues = false + // it := txn.NewIterator(opts) + // defer it.Close() + // prefix := []byte("outbox:sample") // TODO + // for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + // i++ + // } + // return nil + // }) + // if err != nil { + // return nil, err + // } itemsProp := streams.NewActivityStreamsTotalItemsProperty() itemsProp.Set(i) diff --git a/sample-config.toml b/sample-config.toml index 38a71f7..a89678a 100644 --- a/sample-config.toml +++ b/sample-config.toml @@ -3,7 +3,8 @@ dbPath = "bub.db" [[handlers]] name = "sample" -rpc = "cgi" +rpc = "cgi" # rename to protocol? +# add response type? like Note exec = "sample-cgi-handler.sh" interval = "5s" |