|
package internal |
|
|
|
import ( |
|
"bufio" |
|
"bytes" |
|
"fmt" |
|
"io" |
|
"log/slog" |
|
"net/http" |
|
"os" |
|
"os/exec" |
|
"strconv" |
|
"strings" |
|
"sync/atomic" |
|
"syscall" |
|
"time" |
|
|
|
"github.com/go-resty/resty/v2" |
|
"github.com/gogf/gf/container/gmap" |
|
"github.com/google/uuid" |
|
) |
|
|
|
type Worker struct { |
|
ChannelName string |
|
HttpServerPort int32 |
|
LogFile string |
|
Log2Stdout bool |
|
PropertyJsonFile string |
|
GraphName string |
|
Pid int |
|
QuitTimeoutSeconds int |
|
CreateTs int64 |
|
UpdateTs int64 |
|
} |
|
|
|
type WorkerUpdateReq struct { |
|
RequestId string `form:"request_id,omitempty" json:"request_id,omitempty"` |
|
ChannelName string `form:"channel_name,omitempty" json:"channel_name,omitempty"` |
|
Collection string `form:"collection,omitempty" json:"collection"` |
|
FileName string `form:"filename,omitempty" json:"filename"` |
|
Path string `form:"path,omitempty" json:"path,omitempty"` |
|
Ten *WorkerUpdateReqTen `form:"_ten,omitempty" json:"_ten,omitempty"` |
|
} |
|
|
|
type WorkerUpdateReqTen struct { |
|
Name string `form:"name,omitempty" json:"name,omitempty"` |
|
Type string `form:"type,omitempty" json:"type,omitempty"` |
|
} |
|
|
|
const ( |
|
workerCleanSleepSeconds = 5 |
|
workerExec = "/app/agents/bin/start" |
|
workerHttpServerUrl = "http://127.0.0.1" |
|
) |
|
|
|
var ( |
|
workers = gmap.New(true) |
|
httpServerPort = httpServerPortMin |
|
httpServerPortMin = int32(10000) |
|
httpServerPortMax = int32(30000) |
|
) |
|
|
|
func newWorker(channelName string, logFile string, log2Stdout bool, propertyJsonFile string) *Worker { |
|
return &Worker{ |
|
ChannelName: channelName, |
|
LogFile: logFile, |
|
Log2Stdout: log2Stdout, |
|
PropertyJsonFile: propertyJsonFile, |
|
QuitTimeoutSeconds: 60, |
|
CreateTs: time.Now().Unix(), |
|
UpdateTs: time.Now().Unix(), |
|
} |
|
} |
|
|
|
func getHttpServerPort() int32 { |
|
if atomic.LoadInt32(&httpServerPort) > httpServerPortMax { |
|
atomic.StoreInt32(&httpServerPort, httpServerPortMin) |
|
} |
|
|
|
atomic.AddInt32(&httpServerPort, 1) |
|
return httpServerPort |
|
} |
|
|
|
|
|
type PrefixWriter struct { |
|
prefix string |
|
writer io.Writer |
|
} |
|
|
|
|
|
func (pw *PrefixWriter) Write(p []byte) (n int, err error) { |
|
|
|
scanner := bufio.NewScanner(strings.NewReader(string(p))) |
|
var totalWritten int |
|
|
|
for scanner.Scan() { |
|
|
|
line := fmt.Sprintf("[%s] %s", pw.prefix, scanner.Text()) |
|
|
|
n, err := pw.writer.Write([]byte(line + "\n")) |
|
totalWritten += n |
|
|
|
if err != nil { |
|
return totalWritten, err |
|
} |
|
} |
|
|
|
|
|
if err := scanner.Err(); err != nil { |
|
return totalWritten, err |
|
} |
|
|
|
return len(p), nil |
|
} |
|
|
|
|
|
func isInProcessGroup(pid, pgid int) bool { |
|
actualPgid, err := syscall.Getpgid(pid) |
|
if err != nil { |
|
|
|
return false |
|
} |
|
return actualPgid == pgid |
|
} |
|
|
|
func (w *Worker) start(req *StartReq) (err error) { |
|
shell := fmt.Sprintf("cd /app/agents && %s --property %s", workerExec, w.PropertyJsonFile) |
|
slog.Info("Worker start", "requestId", req.RequestId, "shell", shell, logTag) |
|
cmd := exec.Command("sh", "-c", shell) |
|
cmd.SysProcAttr = &syscall.SysProcAttr{ |
|
Setpgid: true, |
|
} |
|
|
|
var stdoutWriter, stderrWriter io.Writer |
|
var logFile *os.File |
|
|
|
if w.Log2Stdout { |
|
|
|
stdoutWriter = os.Stdout |
|
stderrWriter = os.Stderr |
|
} else { |
|
|
|
logFile, err := os.OpenFile(w.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) |
|
if err != nil { |
|
slog.Error("Failed to open log file", "err", err, "requestId", req.RequestId, logTag) |
|
|
|
} |
|
|
|
|
|
stdoutWriter = logFile |
|
stderrWriter = logFile |
|
} |
|
|
|
|
|
stdoutPrefixWriter := &PrefixWriter{ |
|
prefix: "-", |
|
writer: stdoutWriter, |
|
} |
|
stderrPrefixWriter := &PrefixWriter{ |
|
prefix: "-", |
|
writer: stderrWriter, |
|
} |
|
|
|
cmd.Stdout = stdoutPrefixWriter |
|
cmd.Stderr = stderrPrefixWriter |
|
|
|
if err = cmd.Start(); err != nil { |
|
slog.Error("Worker start failed", "err", err, "requestId", req.RequestId, logTag) |
|
return |
|
} |
|
|
|
pid := cmd.Process.Pid |
|
|
|
|
|
shell = fmt.Sprintf("pgrep -P %d", pid) |
|
slog.Info("Worker get pid", "requestId", req.RequestId, "shell", shell, logTag) |
|
|
|
var subprocessPid int |
|
for i := 0; i < 10; i++ { |
|
output, err := exec.Command("sh", "-c", shell).CombinedOutput() |
|
if err == nil { |
|
subprocessPid, err = strconv.Atoi(strings.TrimSpace(string(output))) |
|
if err == nil && subprocessPid > 0 && isInProcessGroup(subprocessPid, cmd.Process.Pid) { |
|
break |
|
} |
|
} |
|
slog.Warn("Worker get pid failed, retrying...", "attempt", i+1, "pid", pid, "subpid", subprocessPid, "requestId", req.RequestId, logTag) |
|
time.Sleep(1000 * time.Millisecond) |
|
} |
|
|
|
|
|
stdoutPrefixWriter.prefix = w.ChannelName |
|
stderrPrefixWriter.prefix = w.ChannelName |
|
w.Pid = pid |
|
|
|
|
|
go func() { |
|
err := cmd.Wait() |
|
if err != nil { |
|
slog.Error("Worker process failed", "err", err, "requestId", req.RequestId, logTag) |
|
} else { |
|
slog.Info("Worker process completed successfully", "requestId", req.RequestId, logTag) |
|
} |
|
|
|
if logFile != nil { |
|
logFile.Close() |
|
} |
|
|
|
|
|
workers.Remove(w.ChannelName) |
|
|
|
}() |
|
|
|
return |
|
} |
|
|
|
func (w *Worker) stop(requestId string, channelName string) (err error) { |
|
slog.Info("Worker stop start", "channelName", channelName, "requestId", requestId, "pid", w.Pid, logTag) |
|
|
|
|
|
|
|
err = syscall.Kill(-w.Pid, syscall.SIGKILL) |
|
if err != nil { |
|
slog.Error("Worker kill failed", "err", err, "channelName", channelName, "worker", w, "requestId", requestId, logTag) |
|
return |
|
} |
|
|
|
workers.Remove(channelName) |
|
|
|
slog.Info("Worker stop end", "channelName", channelName, "worker", w, "requestId", requestId, logTag) |
|
return |
|
} |
|
|
|
func (w *Worker) update(req *WorkerUpdateReq) (err error) { |
|
slog.Info("Worker update start", "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
|
|
|
var res *resty.Response |
|
|
|
defer func() { |
|
if err != nil { |
|
slog.Error("Worker update error", "err", err, "channelName", req.ChannelName, "requestId", req.RequestId, logTag) |
|
} |
|
}() |
|
|
|
workerUpdateUrl := fmt.Sprintf("%s:%d/cmd", workerHttpServerUrl, w.HttpServerPort) |
|
res, err = HttpClient.R(). |
|
SetHeader("Content-Type", "application/json"). |
|
SetBody(req). |
|
Post(workerUpdateUrl) |
|
if err != nil { |
|
return |
|
} |
|
|
|
if res.StatusCode() != http.StatusOK { |
|
return fmt.Errorf("%s, status: %d", codeErrHttpStatusNotOk.msg, res.StatusCode()) |
|
} |
|
|
|
slog.Info("Worker update end", "channelName", req.ChannelName, "worker", w, "requestId", req.RequestId, logTag) |
|
return |
|
} |
|
|
|
|
|
func getRunningWorkerPIDs() map[int]struct{} { |
|
|
|
cmd := exec.Command("sh", "-c", `ps aux | grep "bin/worker --property" | grep -v grep`) |
|
|
|
|
|
var out bytes.Buffer |
|
cmd.Stdout = &out |
|
err := cmd.Run() |
|
if err != nil { |
|
return nil |
|
} |
|
|
|
|
|
lines := strings.Split(out.String(), "\n") |
|
runningPIDs := make(map[int]struct{}) |
|
for _, line := range lines { |
|
fields := strings.Fields(line) |
|
if len(fields) > 1 { |
|
pid, err := strconv.Atoi(fields[1]) |
|
if err == nil { |
|
runningPIDs[pid] = struct{}{} |
|
} |
|
} |
|
} |
|
return runningPIDs |
|
} |
|
|
|
|
|
func killProcess(pid int) { |
|
err := syscall.Kill(pid, syscall.SIGKILL) |
|
if err != nil { |
|
slog.Info("Failed to kill process", "pid", pid, "error", err) |
|
} else { |
|
slog.Info("Successfully killed process", "pid", pid) |
|
} |
|
} |
|
|
|
func timeoutWorkers() { |
|
for { |
|
for _, channelName := range workers.Keys() { |
|
worker := workers.Get(channelName).(*Worker) |
|
|
|
|
|
if worker.QuitTimeoutSeconds == WORKER_TIMEOUT_INFINITY { |
|
continue |
|
} |
|
|
|
nowTs := time.Now().Unix() |
|
if worker.UpdateTs+int64(worker.QuitTimeoutSeconds) < nowTs { |
|
if err := worker.stop(uuid.New().String(), channelName.(string)); err != nil { |
|
slog.Error("Timeout worker stop failed", "err", err, "channelName", channelName, logTag) |
|
continue |
|
} |
|
|
|
slog.Info("Timeout worker stop success", "channelName", channelName, "worker", worker, "nowTs", nowTs, logTag) |
|
} |
|
} |
|
|
|
slog.Debug("Worker timeout check", "sleep", workerCleanSleepSeconds, logTag) |
|
time.Sleep(workerCleanSleepSeconds * time.Second) |
|
} |
|
} |
|
|
|
func CleanWorkers() { |
|
|
|
for _, channelName := range workers.Keys() { |
|
worker := workers.Get(channelName).(*Worker) |
|
if err := worker.stop(uuid.New().String(), channelName.(string)); err != nil { |
|
slog.Error("Worker cleanWorker failed", "err", err, "channelName", channelName, logTag) |
|
continue |
|
} |
|
|
|
slog.Info("Worker cleanWorker success", "channelName", channelName, "worker", worker, logTag) |
|
} |
|
|
|
|
|
runningPIDs := getRunningWorkerPIDs() |
|
|
|
|
|
workerMap := make(map[int]*Worker) |
|
for _, channelName := range workers.Keys() { |
|
worker := workers.Get(channelName).(*Worker) |
|
workerMap[worker.Pid] = worker |
|
} |
|
|
|
|
|
for pid := range runningPIDs { |
|
if _, exists := workerMap[pid]; !exists { |
|
slog.Info("Killing redundant process", "pid", pid) |
|
killProcess(pid) |
|
} |
|
} |
|
} |
|
|