From fa1d778c0d8d7292eb8e2b29dd9d8a49e11c71df Mon Sep 17 00:00:00 2001 From: Julio Capote Date: Fri, 6 Jan 2023 15:25:56 -0500 Subject: try delivering new items to subscribers --- cgi/servers.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/cgi/servers.go b/cgi/servers.go index 44ffb1c..3183a93 100644 --- a/cgi/servers.go +++ b/cgi/servers.go @@ -2,18 +2,23 @@ package cgi import ( "context" + "encoding/json" "fmt" + "io" "io/ioutil" "net" "net/http" "net/http/cgi" + "net/url" "os" "strings" "sync" "time" "git.capotej.com/capotej/communique/config" + "git.capotej.com/capotej/communique/delivery" "git.capotej.com/capotej/communique/models" + "git.capotej.com/capotej/communique/urls" "git.capotej.com/capotej/communique/views" "github.com/mmcdole/gofeed" "go.uber.org/zap" @@ -35,6 +40,12 @@ func (s *Servers) Start() { var wg sync.WaitGroup logger := s.log.With("type", "cgi") + signed, err := delivery.NewSigned(s.persister) + if err != nil { + panic(err) + return + } + for _, handler := range s.cfg.Handlers { handlerLogger := logger.With("handler", handler.Name) wg.Add(3) @@ -46,14 +57,14 @@ func (s *Servers) Start() { // Ticker go func(aHandler config.Handler) { defer wg.Done() - startTicker(aHandler, s.persister, handlerLogger, s.cfg) + startTicker(aHandler, s.persister, handlerLogger, s.cfg, signed) }(handler) // Execute a handler tick on start since Go's ticker waits until $interval to trigger first tick go func(aHandler config.Handler) { defer wg.Done() time.Sleep(1 * time.Second) output := tick(aHandler, handlerLogger) - err := processTick(aHandler, output, s.persister, handlerLogger, s.cfg) + err := processTick(aHandler, output, s.persister, handlerLogger, s.cfg, signed) if err != nil { s.log.Error(err) } @@ -80,7 +91,7 @@ func startCGIServer(h config.Handler, log *zap.SugaredLogger) { server.Serve(unixListener) } -func startTicker(h config.Handler, persister *models.Persister, log *zap.SugaredLogger, cfg config.Config) { +func startTicker(h config.Handler, persister *models.Persister, log *zap.SugaredLogger, cfg config.Config, signed *delivery.Signed) { ticker := time.NewTicker(h.Interval) // TODO add some random jitter here so handlers dont run at the same exact intervals done := make(chan bool) func() { @@ -90,7 +101,7 @@ func startTicker(h config.Handler, persister *models.Persister, log *zap.Sugared return case _ = <-ticker.C: output := tick(h, log) - err := processTick(h, output, persister, log, cfg) + err := processTick(h, output, persister, log, cfg, signed) if err != nil { log.Error(err) } @@ -99,7 +110,7 @@ func startTicker(h config.Handler, persister *models.Persister, log *zap.Sugared }() } -func processTick(h config.Handler, output []byte, persister *models.Persister, log *zap.SugaredLogger, cfg config.Config) error { +func processTick(h config.Handler, output []byte, persister *models.Persister, log *zap.SugaredLogger, cfg config.Config, signed *delivery.Signed) error { fp := gofeed.NewParser() fp.ParseString(string(output)) feed, err := fp.ParseString(string(output)) @@ -128,16 +139,46 @@ func processTick(h config.Handler, output []byte, persister *models.Persister, l logger.Error(err) return } - payload, err := views.RenderActivity(h.Name, cfg.Domain, *outboxItem) + jsonData, err := views.RenderActivity(h.Name, cfg.Domain, *outboxItem) + if err != nil { + logger.Error(err) + return + } + payload, err := json.Marshal(jsonData) if err != nil { logger.Error(err) return } for _, v := range keys { parts := strings.Split(string(v), ":") - url := strings.Join(parts[2:], "") - logger.With("payload", payload).With("inboxUrl", url).Debugf("delivering activity") - + joinedUrl := strings.Join(parts[2:], "") + logger.With("payload", payload).With("inboxUrl", joinedUrl).Debugf("delivering activity") + + actorKeyUrl, err := urls.UrlProfileKey(h.Name, cfg.Domain) + if err != nil { + logger.Error(err) + return + } + parsedUrl, err := url.Parse(joinedUrl) + if err != nil { + logger.Error(err) + return + } + request, err := signed.SignedRequest(h, []byte(payload), parsedUrl, actorKeyUrl) + if err != nil { + logger.Error(err) + return + } + + client := &http.Client{} + response, err := client.Do(request) + if err != nil { + logger.Errorf("could not send activity request: %w", err) + return + } + responseBody, err := io.ReadAll(response.Body) + defer response.Body.Close() + logger.With("response", responseBody).With("status", response.Status).Debugf("remote inbox response received") } }) if err != nil { -- cgit v1.2.3