前几天正在写博客评点哪吒,结果OVH放货了KS-A,等写完发现,已经卖完了。这不得不拍断大腿了。
赶紧补救,写个监控KS-A放货的玩意,下次一定要把握住了。
程序就是每一分钟去ovh网页上拉一下库存,如果有,就telegram赶紧通知。
先只贴go源代码了,改天有空我看看是不是写个使用说明。
调用格式是 ./ovh -token=tg机器人的token -chatid=自己tg号的id
加上-test,会模拟加拿大有货,给tg发条通知。这样检查tg参数对不对。

package main

import (
	"encoding/json"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"net/url"
	"os"
	"strings"
	"time"
)

const apiURL = "https://ca.ovh.com/engine/apiv6/dedicated/server/datacenter/availabilities/?excludeDatacenters=false&planCode=24ska01&server=24ska01"

type Datacenter struct {
	Availability string `json:"availability"`
	Datacenter   string `json:"datacenter"`
}

type Server struct {
	Datacenters []Datacenter `json:"datacenters"`
}

func checkAvailability(testMode bool, telegramBotToken, chatID string) {
	log.Println("Starting availability check...")

	resp, err := http.Get(apiURL)
	if err != nil {
		log.Println("Failed to fetch data:", err)
		return
	}
	defer resp.Body.Close()

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		log.Println("Failed to read response body:", err)
		return
	}

	log.Println("Received JSON response:", string(body))

	var servers []Server
	err = json.Unmarshal(body, &servers)
	if err != nil {
		log.Println("Failed to parse JSON:", err)
		return
	}

	// 如果是测试模式,强行设置 bhs 数据中心的可用性为 72H
	if testMode {
		log.Println("Test mode enabled. Forcing bhs datacenter availability to 72H.")
		for i, server := range servers {
			for j, dc := range server.Datacenters {
				if dc.Datacenter == "bhs" {
					servers[i].Datacenters[j].Availability = "72H"
				}
			}
		}
	}

	for _, server := range servers {
		for _, dc := range server.Datacenters {
			log.Printf("Checking datacenter: %s, availability: %s\n", dc.Datacenter, dc.Availability)
			if dc.Availability != "unavailable" {
				log.Printf("Availability change detected: %s in datacenter: %s\n", dc.Availability, dc.Datacenter)
				sendTelegramNotification(dc.Datacenter, dc.Availability, telegramBotToken, chatID)
				return // 一旦发现可用的服务器,立即发送通知并停止进一步检查
			}
		}
	}

	log.Println("No availability changes detected.")
}

func sendTelegramNotification(datacenter, availability, telegramBotToken, chatID string) {
	msg := fmt.Sprintf("Server availability changed to: %s in datacenter: %s", availability, datacenter)
	telegramApi := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", telegramBotToken)

	log.Printf("Sending notification: %s\n", msg)

	form := url.Values{}
	form.Add("chat_id", chatID)
	form.Add("text", msg)

	resp, err := http.Post(telegramApi, "application/x-www-form-urlencoded", strings.NewReader(form.Encode()))
	if err != nil {
		log.Println("Failed to send telegram message:", err)
		return
	}
	defer resp.Body.Close()

	log.Println("Notification sent successfully.")
}

func main() {
	// 解析命令行参数
	testMode := flag.Bool("test", false, "Enable test mode to simulate availability change")
	telegramBotToken := flag.String("token", "", "Telegram bot token")
	chatID := flag.String("chatid", "", "Telegram chat ID")
	flag.Parse()

	// 检查是否提供了必要的参数
	if *telegramBotToken == "" || *chatID == "" {
		fmt.Println("Usage: ./exename -token=<telegramBotToken> -chatid=<chatID> [-test]")
		os.Exit(1)
	}

	for {
		checkAvailability(*testMode, *telegramBotToken, *chatID)
		time.Sleep(1 * time.Minute)
	}
}

标签: none

暂无评论

添加新评论