aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--cgi/servers.go39
-rw-r--r--config/config.go10
-rw-r--r--main.go19
-rw-r--r--registry/registry.go8
-rw-r--r--resources/outbox.go29
-rw-r--r--sample-config.toml3
7 files changed, 86 insertions, 24 deletions
diff --git a/.gitignore b/.gitignore
index ba2906d..ee66908 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,3 @@
main
+*.db
+*.sock \ No newline at end of file
diff --git a/cgi/servers.go b/cgi/servers.go
index 0f23aac..84d6ffe 100644
--- a/cgi/servers.go
+++ b/cgi/servers.go
@@ -3,7 +3,7 @@ package cgi
import (
"context"
"fmt"
- "io"
+ "io/ioutil"
"net"
"net/http"
"net/http/cgi"
@@ -12,12 +12,17 @@ import (
"time"
"git.capotej.com/capotej/communique/config"
+ "github.com/dgraph-io/badger/v3"
+ "go.uber.org/zap"
)
-type Servers struct{}
+type Servers struct {
+ log *zap.SugaredLogger
+ db *badger.DB
+}
-func NewServers() *Servers {
- return &Servers{}
+func NewServers(log *zap.SugaredLogger, db *badger.DB) *Servers {
+ return &Servers{log: log, db: db}
}
// Start iterates over all Handlers and starts an internal CGI server for each one
@@ -33,7 +38,7 @@ func (s *Servers) Start(cfg config.Config) {
}(handler)
go func(aHandler config.Handler) {
defer wg.Done()
- startTicker(aHandler)
+ startTicker(aHandler, s.db, s.log)
}(handler)
}
wg.Wait()
@@ -55,23 +60,32 @@ func startCGIServer(h config.Handler) {
server.Serve(unixListener)
}
-func startTicker(h config.Handler) {
+func startTicker(h config.Handler, db *badger.DB, log *zap.SugaredLogger) {
// TODO add config for this
- ticker := time.NewTicker(5 * time.Second)
+ ticker := time.NewTicker(h.Interval)
done := make(chan bool)
func() {
for {
select {
case <-done:
return
- case _ = <-ticker.C:
- tick(h)
+ case t := <-ticker.C:
+ output := tick(h)
+ keyName := fmt.Sprintf("outbox:%s:%d", h.Name, t.Unix())
+ err := db.Update(func(txn *badger.Txn) error {
+ e := badger.NewEntry([]byte(keyName), output)
+ log.With("name", "ticker").Infof("writing '%s' to %s", output, keyName)
+ return txn.SetEntry(e)
+ })
+ if err != nil {
+ log.Error(err)
+ }
}
}
}()
}
-func tick(h config.Handler) {
+func tick(h config.Handler) []byte {
sock := fmt.Sprintf("%s.sock", h.Name)
httpc := http.Client{
Transport: &http.Transport{
@@ -86,5 +100,8 @@ func tick(h config.Handler) {
if err != nil {
panic(err)
}
- io.Copy(os.Stdout, response.Body)
+
+ body, err := ioutil.ReadAll(response.Body)
+ response.Body.Close()
+ return body
}
diff --git a/config/config.go b/config/config.go
index 84024d7..e8e5387 100644
--- a/config/config.go
+++ b/config/config.go
@@ -1,12 +1,16 @@
package config
+import "time"
+
type Config struct {
Domain string
Handlers []Handler
+ DbPath string
}
type Handler struct {
- Name string
- Exec string
- Rpc string
+ Name string
+ Exec string
+ Rpc string
+ Interval time.Duration
}
diff --git a/main.go b/main.go
index 165cacb..fe2bb30 100644
--- a/main.go
+++ b/main.go
@@ -8,6 +8,7 @@ import (
"git.capotej.com/capotej/communique/http"
"git.capotej.com/capotej/communique/registry"
"github.com/BurntSushi/toml"
+ "github.com/dgraph-io/badger/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapio"
)
@@ -27,16 +28,26 @@ func main() {
log.Fatal(err)
}
log.Debugf("Loaded TOML Config: %+v", cfg)
- registry := registry.NewRegistry(cfg)
+ // DB
+ db, err := badger.Open(badger.DefaultOptions(cfg.DbPath))
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer db.Close()
+
+ // Registry
+ registry := registry.NewRegistry(cfg, db)
+
+ // Servers
var mainWg sync.WaitGroup
- // Internal CGI Servers
- cgiServers := cgi.NewServers()
+ // // Internal CGI Servers
+ cgiServers := cgi.NewServers(log, db)
mainWg.Add(1)
go cgiServers.Start(cfg)
- // External Http Server
+ // // External Http Server
writer := &zapio.Writer{Log: logger, Level: zap.DebugLevel}
defer writer.Close()
server := http.NewServer(registry)
diff --git a/registry/registry.go b/registry/registry.go
index b12b773..8d047ff 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -5,6 +5,7 @@ import (
"git.capotej.com/capotej/communique/config"
"git.capotej.com/capotej/communique/resources"
+ "github.com/dgraph-io/badger/v3"
)
type Handler struct {
@@ -13,11 +14,12 @@ type Handler struct {
type Registry struct {
cfg config.Config
+ db *badger.DB
handlerMap map[string]Handler
}
-func NewRegistry(cfg config.Config) *Registry {
- reg := Registry{cfg: cfg}
+func NewRegistry(cfg config.Config, db *badger.DB) *Registry {
+ reg := Registry{cfg: cfg, db: db}
reg.handlerMap = make(map[string]Handler)
for _, v := range cfg.Handlers {
reg.handlerMap[fqn(v.Name, cfg.Domain)] = Handler{handlerCfg: v}
@@ -38,7 +40,7 @@ func (r *Registry) Outbox(name string) (map[string]interface{}, error) {
if handler == nil {
return nil, nil
}
- return resources.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain)
+ return resources.RenderOutbox(handler.handlerCfg.Name, r.cfg.Domain, r.db)
}
func (r *Registry) WebfingerResource(fqn string) (*resources.WebfingerResource, error) {
diff --git a/resources/outbox.go b/resources/outbox.go
index f9a4aef..046ec6a 100644
--- a/resources/outbox.go
+++ b/resources/outbox.go
@@ -5,10 +5,11 @@ import (
"net/url"
"path"
+ "github.com/dgraph-io/badger/v3"
"github.com/go-fed/activity/streams"
)
-func RenderOutbox(name, domain string) (map[string]interface{}, error) {
+func RenderOutbox(name, domain string, db *badger.DB) (map[string]interface{}, error) {
id, err := url.Parse(path.Join("https://", domain, "actors", name, "outbox"))
if err != nil {
@@ -21,13 +22,37 @@ func RenderOutbox(name, domain string) (map[string]interface{}, error) {
}
oc := streams.NewActivityStreamsOrderedCollection()
+
idProp := streams.NewJSONLDIdProperty()
idProp.Set(id)
oc.SetJSONLDId(idProp)
+ var i int
+ db.View(func(txn *badger.Txn) error {
+ it := txn.NewIterator(badger.DefaultIteratorOptions)
+ defer it.Close()
+ prefix := []byte("outbox:sample")
+ for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
+ item := it.Item()
+ // k := item.Key()
+ err := item.Value(func(v []byte) error {
+ i++
+ // fmt.Printf("key=%s, value=%s\n", k, v)
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+
+ itemsProp := streams.NewActivityStreamsTotalItemsProperty()
+ itemsProp.Set(i)
+ oc.SetActivityStreamsTotalItems(itemsProp)
+
ocProp := streams.NewActivityStreamsFirstProperty()
ocProp.SetIRI(first)
-
oc.SetActivityStreamsFirst(ocProp)
return streams.Serialize(oc)
diff --git a/sample-config.toml b/sample-config.toml
index 2d0423f..6a8b6f9 100644
--- a/sample-config.toml
+++ b/sample-config.toml
@@ -1,10 +1,11 @@
domain = "activitybub.xyz"
+dbPath = "bub.db"
[[handlers]]
name = "sample"
rpc = "cgi"
exec = "sample-cgi-handler.sh"
-intervalSeconds = "60"
+interval = "10s"
# [[handlers]]
# name = "another"