Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/go_modules/github.com/golang-jw…
Browse files Browse the repository at this point in the history
…t/jwt/v4-4.5.1
  • Loading branch information
Xe authored Dec 11, 2024
2 parents 2100673 + 27eb123 commit 2c193f9
Show file tree
Hide file tree
Showing 51 changed files with 1,459 additions and 73 deletions.
11 changes: 11 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]

[dev-packages]

[requires]
python_version = "3.13"
14 changes: 8 additions & 6 deletions cmd/mi/services/posse/posse.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ func New(ctx context.Context, dao *models.DAO, cfg Config) (*Announcer, error) {
}

func (a *Announcer) Announce(ctx context.Context, it *jsonfeed.Item) (*emptypb.Empty, error) {
switch {
case strings.Contains(it.GetUrl(), "svc.alrest.xeserv.us"),
strings.Contains(it.GetUrl(), "shark-harmonic.ts.net"),
strings.Contains(it.GetUrl(), "preview.xeiaso.net"):
slog.Info("skipping announcement", "url", it.GetUrl(), "reason", "staging URLs")
u, err := url.Parse(it.GetUrl())
if err != nil {
slog.Error("[unexpected] can't parse URL", "err", err, "url", it.GetUrl())
return &emptypb.Empty{}, nil
}

if u.Host != "xeiaso.net" {
slog.Info("skipping announcement", "url", it.GetUrl(), "reason", "non-prod URLs")
return &emptypb.Empty{}, nil
}
if has, err := a.dao.HasBlogpost(ctx, it.GetUrl()); err != nil {
return nil, err
} else if has {
Expand Down Expand Up @@ -154,7 +156,7 @@ func (a *Announcer) Announce(ctx context.Context, it *jsonfeed.Item) (*emptypb.E
g.Go(func() error {
if _, err := a.mimi.Announce(gCtx, it); err != nil {
slog.Error("failed to announce to mimi", "err", err)
return err
return nil
}
possePosts.WithLabelValues("irc").Inc()
return nil
Expand Down
3 changes: 2 additions & 1 deletion cmd/mi/services/twitchevents/twitchevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ func (s *Server) handleStreamUp(ctx context.Context, lg *slog.Logger, ev *helix.
if _, err := s.mimi.Post(gCtx, &announce.StatusUpdate{
Body: streamAnnouncement,
}); err != nil {
return fmt.Errorf("can't announce to Mimi: %w", err)
slog.Error("can't announce to Mimi", "err", err)
return nil
}

lg.Info("posted to Mimi")
Expand Down
2 changes: 2 additions & 0 deletions cmd/orodyagzou/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.env
gpu_names_cache.json
277 changes: 277 additions & 0 deletions cmd/orodyagzou/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"log/slog"
"net/http"
"net/http/httputil"
"net/url"
"os"
"sync"
"time"

"github.com/google/uuid"
"github.com/joho/godotenv"
"within.website/x/internal"
"within.website/x/web/vastai/vastaicli"
)

var (
bind = flag.String("bind", ":3238", "HTTP port to bind to")
diskSizeGB = flag.Int("vastai-disk-size-gb", 32, "amount of disk we need from vast.ai")
dockerImage = flag.String("docker-image", "reg.xeiaso.net/runner/sdxl-tigris:latest", "docker image to start")
onstartCmd = flag.String("onstart-cmd", "python -m cog.server.http", "onstart command to run in vast.ai")
vastaiPort = flag.Int("vastai-port", 5000, "port that the guest will use in vast.ai")
vastaiFilters = flag.String("vastai-filters", "verified=False cuda_max_good>=12.1 gpu_ram>=12 num_gpus=1 inet_down>=850", "vast.ai search filters")
)

func main() {
internal.HandleStartup()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if flag.NArg() != 1 {
fmt.Println("usage: orodyagzou [flags] <whatever.env>")
os.Exit(2)
}

fname := flag.Arg(0)
slog.Debug("using env file", "fname", fname)

env, err := godotenv.Read(fname)
if err != nil {
slog.Error("can't read env file", "fname", fname, "err", err)
os.Exit(1)
}

var cfg vastaicli.InstanceConfig

cfg.DiskSizeGB = *diskSizeGB
cfg.Environment = env
cfg.DockerImage = *dockerImage
cfg.OnStartCMD = *onstartCmd
cfg.Ports = append(cfg.Ports, *vastaiPort)

images := &ScaleToZeroProxy{
cfg: cfg,
}

go images.slayLoop(ctx)

mux := http.NewServeMux()
mux.Handle("/v1/images", images)

fmt.Printf("http://localhost%s\n", *bind)
log.Fatal(http.ListenAndServe(*bind, mux))
}

type ScaleToZeroProxy struct {
cfg vastaicli.InstanceConfig

// locked fields
lock sync.RWMutex
endpointURL string
instanceID int
ready bool
lastUsed time.Time
}

func (s *ScaleToZeroProxy) slayLoop(ctx context.Context) {
t := time.NewTicker(time.Minute)
defer t.Stop()

for {
select {
case <-ctx.Done():
slog.Error("context canceled", "err", ctx.Err())
return
case <-t.C:
s.lock.RLock()
ready := s.ready
lastUsed := s.lastUsed
s.lock.RUnlock()

if !ready {
continue
}

if lastUsed.Add(5 * time.Minute).Before(time.Now()) {
if err := s.slay(ctx); err != nil {
slog.Error("can't slay instance", "err", err)
}
}
}
}
}

func (s *ScaleToZeroProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.lock.RLock()
ready := s.ready
s.lock.RUnlock()

if !ready {
if err := s.mint(r.Context()); err != nil {
slog.Error("can't mint", "err", err)
http.Error(w, "can't mint", http.StatusInternalServerError)
return
}
}

s.lock.RLock()
endpointURL := s.endpointURL
s.lock.RUnlock()

u, err := url.Parse(endpointURL)
if err != nil {
slog.Error("can't url parse", "err", err, "url", s.endpointURL)
http.Error(w, "can't url parse", http.StatusInternalServerError)
return
}

next := httputil.NewSingleHostReverseProxy(u)
od := next.Director
next.Director = func(r *http.Request) {
od(r)
r.URL.Path = "/predictions/" + uuid.NewString()
}
next.ServeHTTP(w, r)

s.lock.Lock()
s.lastUsed = time.Now()
s.lock.Unlock()
}

func (s *ScaleToZeroProxy) mint(ctx context.Context) error {
s.lock.Lock()
defer s.lock.Unlock()

candidates, err := vastaicli.Search(ctx, *vastaiFilters, "dph+")
if err != nil {
return err
}

candidate := candidates[0]
slog.Info("found instance", "costDPH", candidate.DphTotal, "gpuName", candidate.GpuName)

instanceData, err := vastaicli.Mint(ctx, candidate.AskContractID, s.cfg)
if err != nil {
return err
}

slog.Info("created instance, waiting for things to settle", "id", instanceData.NewContract)

instance, err := s.delayUntilRunning(ctx, instanceData.NewContract)
if err != nil {
return err
}

addr, ok := instance.AddrFor(s.cfg.Ports[0])
if !ok {
return fmt.Errorf("somehow can't get port %d for instance %d, is god dead?", s.cfg.Ports[0], instance.ID)
}

s.endpointURL = "http://" + addr + "/"
s.ready = true
s.instanceID = instance.ID
s.lastUsed = time.Now().Add(5 * time.Minute)

if err := s.delayUntilReady(ctx, s.endpointURL); err != nil {
return fmt.Errorf("can't do healthcheck: %w", err)
}

slog.Info("ready", "endpointURL", s.endpointURL, "instanceID", s.instanceID)

return nil
}

func (s *ScaleToZeroProxy) slay(ctx context.Context) error {
s.lock.Lock()
defer s.lock.Unlock()

if err := vastaicli.Slay(ctx, s.instanceID); err != nil {
return err
}

s.endpointURL = ""
s.ready = false
s.lastUsed = time.Now()
s.instanceID = 0

slog.Info("instance slayed", "docker_image", s.cfg.DockerImage)

return nil
}

func (s *ScaleToZeroProxy) delayUntilReady(ctx context.Context, endpointURL string) error {
type cogHealthCheck struct {
Status string `json:"status"`
}

u, err := url.Parse(endpointURL)
if err != nil {
return fmt.Errorf("[unexpected] can't parse endpoint url %q: %w", endpointURL, err)
}

u.Path = "/health-check"

t := time.NewTicker(time.Second)
defer t.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
resp, err := http.Get(u.String())
if err != nil {
return fmt.Errorf("can't fetch health check: %w", err)
}

var status cogHealthCheck
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return fmt.Errorf("can't parse health check response: %w", err)
}

if status.Status == "READY" {
slog.Info("health check passed")
return nil
}
}
}
}

func (s *ScaleToZeroProxy) delayUntilRunning(ctx context.Context, instanceID int) (*vastaicli.Instance, error) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()

var instance *vastaicli.Instance
var err error

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-t.C:
instance, err = vastaicli.GetInstance(ctx, instanceID)
if err != nil {
return nil, err
}

slog.Debug("instance is cooking", "curr", instance.ActualStatus, "next", instance.NextState, "status", instance.StatusMsg)

if instance.ActualStatus == "running" {
_, ok := instance.AddrFor(s.cfg.Ports[0])
if !ok {
slog.Info("no addr", "ports", s.cfg.Ports)
continue
}

return instance, nil
}
}
}
}
2 changes: 2 additions & 0 deletions docs/bsky/didweb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.terraform
*.tfvars
Loading

0 comments on commit 2c193f9

Please sign in to comment.