package cgi import ( "context" "encoding/json" "fmt" "io" "io/ioutil" "net" "net/http" "net/http/cgi" "net/url" "os" "strings" "sync" "time" "git.capotej.com/capotej/communique/config" "git.capotej.com/capotej/communique/delivery" "git.capotej.com/capotej/communique/models" "git.capotej.com/capotej/communique/urls" "git.capotej.com/capotej/communique/views" "github.com/mmcdole/gofeed" "go.uber.org/zap" ) type Servers struct { log *zap.SugaredLogger persister *models.Persister cfg config.Config } func NewServers(log *zap.SugaredLogger, persister *models.Persister, cfg config.Config) *Servers { return &Servers{log: log, persister: persister, cfg: cfg} } // Start iterates over all Handlers and starts an internal CGI server for each one // along with ticker for the configured handler interval then blocks indefinitely func (s *Servers) Start() { var wg sync.WaitGroup logger := s.log.With("type", "cgi") signed, err := delivery.NewSigned(s.persister) if err != nil { panic(err) return } for _, handler := range s.cfg.Handlers { handlerLogger := logger.With("handler", handler.Name) wg.Add(3) // Internal CGI server go func(aHandler config.Handler) { defer wg.Done() startCGIServer(aHandler, handlerLogger) }(handler) // Ticker go func(aHandler config.Handler) { defer wg.Done() startTicker(aHandler, s.persister, handlerLogger, s.cfg, signed) }(handler) // Execute a handler tick on start since Go's ticker waits until $interval to trigger first tick go func(aHandler config.Handler) { defer wg.Done() time.Sleep(1 * time.Second) output := tick(aHandler, handlerLogger) err := processTick(aHandler, output, s.persister, handlerLogger, s.cfg, signed) if err != nil { s.log.Error(err) } }(handler) } wg.Wait() } func startCGIServer(h config.Handler, log *zap.SugaredLogger) { cgiHandler := cgi.Handler{Path: h.Exec} server := http.Server{ Handler: &cgiHandler, } sock := fmt.Sprintf("%s.sock", h.Name) log.Debugf("starting cgi server at %s", sock) os.Remove(sock) unixListener, err := net.Listen("unix", sock) if err != nil { panic(err) } server.Serve(unixListener) } func startTicker(h config.Handler, persister *models.Persister, log *zap.SugaredLogger, cfg config.Config, signed *delivery.Signed) { ticker := time.NewTicker(h.Interval) // TODO add some random jitter here so handlers dont run at the same exact intervals done := make(chan bool) func() { for { select { case <-done: return case _ = <-ticker.C: output := tick(h, log) err := processTick(h, output, persister, log, cfg, signed) if err != nil { log.Error(err) } } } }() } func processTick(h config.Handler, output []byte, persister *models.Persister, log *zap.SugaredLogger, cfg config.Config, signed *delivery.Signed) error { fp := gofeed.NewParser() fp.ParseString(string(output)) feed, err := fp.ParseString(string(output)) if err != nil { return err } for _, v := range feed.Items { var extractedContent string // if there is no content, use descrption if len(v.Content) != 0 { extractedContent = v.Content } else if len(v.Description) != 0 { extractedContent = v.Description } if len(extractedContent) != 0 { log.Debugf("extracted content '%s'", extractedContent) outboxItem := models.CreateOutboxItem(h, []byte(extractedContent)) err = persister.StoreWithCallback(outboxItem, func() { logger := log.With("handler", h.Name).With("type", "subscription") // go through handler subscriptions and deliver signed outbox items logger.Debug("callback for save") sub := models.NewSubscription(h) keys, err := persister.CollectKeys(sub) if err != nil { logger.Error(err) return } jsonData, err := views.RenderActivity(h.Name, cfg.Domain, *outboxItem) if err != nil { logger.Error(err) return } payload, err := json.Marshal(jsonData) if err != nil { logger.Error(err) return } for _, v := range keys { parts := strings.Split(string(v), ":") joinedUrl := strings.Join(parts[2:], "") logger.With("payload", payload).With("inboxUrl", joinedUrl).Debugf("delivering activity") actorKeyUrl, err := urls.UrlProfileKey(h.Name, cfg.Domain) if err != nil { logger.Error(err) return } parsedUrl, err := url.Parse(joinedUrl) if err != nil { logger.Error(err) return } request, err := signed.SignedRequest(h, []byte(payload), parsedUrl, actorKeyUrl) if err != nil { logger.Error(err) return } client := &http.Client{} response, err := client.Do(request) if err != nil { logger.Errorf("could not send activity request: %w", err) return } responseBody, err := io.ReadAll(response.Body) defer response.Body.Close() logger.With("response", responseBody).With("status", response.Status).Debugf("remote inbox response received") } }) if err != nil { return err } } } return nil } func tick(h config.Handler, log *zap.SugaredLogger) []byte { sock := fmt.Sprintf("%s.sock", h.Name) httpc := http.Client{ Transport: &http.Transport{ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { return net.Dial("unix", sock) }, }, } var response *http.Response var err error log.Debugf("executing cgi handler at %s", sock) response, err = httpc.Get("http://unix/" + sock) if err != nil { log.Errorf("received error from cgi handler %s", err) } body, err := ioutil.ReadAll(response.Body) response.Body.Close() return body }