From 3761f85966a554cb750809da21995354fb9f9ceb Mon Sep 17 00:00:00 2001 From: Julio Capote Date: Sun, 18 Dec 2022 22:00:36 -0500 Subject: uncommit --- .gitignore | 2 ++ cgi/servers.go | 39 ++++++++++++++++++++++++++++----------- config/config.go | 10 +++++++--- main.go | 19 +++++++++++++++---- registry/registry.go | 8 +++++--- resources/outbox.go | 29 +++++++++++++++++++++++++++-- sample-config.toml | 3 ++- 7 files changed, 86 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index ba2906d..ee66908 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ main +*.db +*.sock \ No newline at end of file diff --git a/cgi/servers.go b/cgi/servers.go index 0f23aac..84d6ffe 100644 --- a/cgi/servers.go +++ b/cgi/servers.go @@ -3,7 +3,7 @@ package cgi import ( "context" "fmt" - "io" + "io/ioutil" "net" "net/http" "net/http/cgi" @@ -12,12 +12,17 @@ import ( "time" "git.capotej.com/capotej/communique/config" + "github.com/dgraph-io/badger/v3" + "go.uber.org/zap" ) -type Servers struct{} +type Servers struct { + log *zap.SugaredLogger + db *badger.DB +} -func NewServers() *Servers { - return &Servers{} +func NewServers(log *zap.SugaredLogger, db *badger.DB) *Servers { + return &Servers{log: log, db: db} } // Start iterates over all Handlers and starts an internal CGI server for each one @@ -33,7 +38,7 @@ func (s *Servers) Start(cfg config.Config) { }(handler) go func(aHandler config.Handler) { defer wg.Done() - startTicker(aHandler) + startTicker(aHandler, s.db, s.log) }(handler) } wg.Wait() @@ -55,23 +60,32 @@ func startCGIServer(h config.Handler) { server.Serve(unixListener) } -func startTicker(h config.Handler) { +func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) { // TODO add config for this - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(h.Interval) done := make(chan bool) func() { for { select { case <-done: return - case _ = <-ticker.C: - tick(h) + case t := <-ticker.C: + output := tick(h) + keyName := fmt.Sprintf("outbox:%s:%d", h.Name, t.Unix()) + err := db.Update(func(txn *badger.Txn) error { + e := badger.NewEntry([]byte(keyName), output) + log.With("name", "ticker").Infof("writing '%s' to %s", output, keyName) + return txn.SetEntry(e) + }) + if err != nil { + log.Error(err) + } } } }() } -func tick(h config.Handler) { +func tick(h config.Handler) []byte { sock := fmt.Sprintf("%s.sock", h.Name) httpc := http.Client{ Transport: &http.Transport{ @@ -86,5 +100,8 @@ func tick(h config.Handler) { if err != nil { panic(err) } - io.Copy(os.Stdout, response.Body) + + body, err := ioutil.ReadAll(response.Body) + response.Body.Close() + return body } diff --git a/config/config.go b/config/config.go index 84024d7..e8e5387 100644 --- a/config/config.go +++ b/config/config.go @@ -1,12 +1,16 @@ package config +import "time" + type Config struct { Domain string Handlers []Handler + DbPath string } type Handler struct { - Name string - Exec string - Rpc string + Name string + Exec string + Rpc string + Interval time.Duration } diff --git a/main.go b/main.go index 165cacb..fe2bb30 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "git.capotej.com/capotej/communique/http" "git.capotej.com/capotej/communique/registry" "github.com/BurntSushi/toml" + "github.com/dgraph-io/badger/v3" "go.uber.org/zap" "go.uber.org/zap/zapio" ) @@ -27,16 +28,26 @@ func main() { log.Fatal(err) } log.Debugf("Loaded TOML Config: %+v", cfg) - registry := registry.NewRegistry(cfg) + // DB + db, err := badger.Open(badger.DefaultOptions(cfg.DbPath)) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + // Registry + registry := registry.NewRegistry(cfg, db) + + // Servers var mainWg sync.WaitGroup - // Internal CGI Servers - cgiServers := cgi.NewServers() + // // Internal CGI Servers + cgiServers := cgi.NewServers(log, db) mainWg.Add(1) go cgiServers.Start(cfg) - // External Http Server + // // External Http Server writer := &zapio.Writer{Log: logger, Level: zap.DebugLevel} defer writer.Close() server := http.NewServer(registry) diff --git a/registry/registry.go b/registry/registry.go index b12b773..8d047ff 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -5,6 +5,7 @@ import ( "git.capotej.com/capotej/communique/config" "git.capotej.com/capotej/communique/resources" + "github.com/dgraph-io/badger/v3" ) type Handler struct { @@ -13,11 +14,12 @@ type Handler struct { type Registry struct { cfg config.Config + db *badger.DB handlerMap map[string]Handler } -func NewRegistry(cfg config.Config) *Registry { - reg := Registry{cfg: cfg} +func NewRegistry(cfg config.Config, db *badger.DB) *Registry { + reg := Registry{cfg: cfg, db: db} reg.handlerMap = make(map[string]Handler) for _, v := range cfg.Handlers { reg.handlerMap[fqn(v.Name, cfg.Domain)] = Handler{handlerCfg: v} @@ -38,7 +40,7 @@ func (r *Registry) Outbox(name string) (map[string]interface{}, error) { if handler == nil { return nil, nil } - return resources.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain) + return resources.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain, r.db) } func (r *Registry) WebfingerResource(fqn string) (*resources.WebfingerResource, error) { diff --git a/resources/outbox.go b/resources/outbox.go index f9a4aef..046ec6a 100644 --- a/resources/outbox.go +++ b/resources/outbox.go @@ -5,10 +5,11 @@ import ( "net/url" "path" + "github.com/dgraph-io/badger/v3" "github.com/go-fed/activity/streams" ) -func RenderOutbox(name, domain string) (map[string]interface{}, error) { +func RenderOutbox(name, domain string, db *badger.DB) (map[string]interface{}, error) { id, err := url.Parse(path.Join("https://", domain, "actors", name, "outbox")) if err != nil { @@ -21,13 +22,37 @@ func RenderOutbox(name, domain string) (map[string]interface{}, error) { } oc := streams.NewActivityStreamsOrderedCollection() + idProp := streams.NewJSONLDIdProperty() idProp.Set(id) oc.SetJSONLDId(idProp) + var i int + db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + prefix := []byte("outbox:sample") + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + // k := item.Key() + err := item.Value(func(v []byte) error { + i++ + // fmt.Printf("key=%s, value=%s\n", k, v) + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + itemsProp := streams.NewActivityStreamsTotalItemsProperty() + itemsProp.Set(i) + oc.SetActivityStreamsTotalItems(itemsProp) + ocProp := streams.NewActivityStreamsFirstProperty() ocProp.SetIRI(first) - oc.SetActivityStreamsFirst(ocProp) return streams.Serialize(oc) diff --git a/sample-config.toml b/sample-config.toml index 2d0423f..6a8b6f9 100644 --- a/sample-config.toml +++ b/sample-config.toml @@ -1,10 +1,11 @@ domain = "activitybub.xyz" +dbPath = "bub.db" [[handlers]] name = "sample" rpc = "cgi" exec = "sample-cgi-handler.sh" -intervalSeconds = "60" +interval = "10s" # [[handlers]] # name = "another" -- cgit v1.2.3