从上学期开始,学校的教务系统更新了。新版的正方多了一个负载均衡的功能,但无奈实在做的太差了!为什么这么说尼? 因为在上学期抢课的时候入口网关依旧拉闸,而真正的后端服务器访问速度比德芙还丝滑。因此暂将锅甩给负载均衡服务器。既然学校的负载均衡拉闸,那么就有了 “帮学校做负载均衡” 的想法。 大体的思路就是,通过扫描内网网段发现隐藏的地址,然后负载均衡这些隐藏的地址。 既然是负载均衡,第一时间想到的就是Nginx
了,通过资料搜集,大致确定了基本的流程,因此有了这么的这个流程图:
需要解决的问题 由于非官方的代理,因此我们无法知道教务系统部署在内网的哪些机器上,而校园网一般使用B类网,因此需要扫描大量的ip地址。 但这样无疑会触发学校网管的报警,正常情况下会导致ip短时间内被封。导致扫描质量低(由于被禁网,导致程序误认为扫描超时从而导致目标未被发现)。 基于这个问题,随之而生的想法就是分布式扫描,将扫描的工作打散到用户中,服务器只负责扫描几个主干网段。 一句话概括就是: 将扫描工作分散到多台机器上 ,流程图如下:
Todo
前情提示: 本文通信均采用HTTP、代码部分存在伪代码 代码仓库: (暂不开源)
注册中心 -> 服务注册 与服务发现
日志服务 -> 记录日志
数据库服务 -> 数据库操作(CRUD)
扫描器服务 -> 发现教务系统地址
测试器服务 -> 测试数据库中地址的健康度
Api服务 -> 向consul
发送负载信息
大致结构如下:
注册中心 作为一个分布式系统,不可避免地需要服务注册 与服务发现 。因此需要一个注册中心
来处理各个服务
之间的依赖关系,在服务上线后通知依赖这个服务的服务(这里有点绕)举个例子 :数据库
的每个操作都需要记录日志,日志为了统一管理所以有一个日志服务
专门处理日志信息。此时,日志服务
因某些原因(可能是人为、也可能是网络掉线等)在数据库服务
注册之后才注册,这是注册中心
就需要通知数据库
,让数据库
的日志记录转成使用日志服务
。
功能与实现 作为一个注册中心,首先我们需要一个web服务来接收服务发送的信息(注册、依赖更新、注销等等),但在这之前我们先来定义一下我们要用到的结构(在面向对象中为类)
registry
来表示注册操作
registrations
来存放注册的服务
add
注册服务
notify
事件通知
sendRequiredServices
发送依赖的服务
sendPatch
发送依赖项
remove
移除(注销)服务
Heartbeat
心跳包
Registration
表示服务注册结构体
ServiceName
服务名
ServiceURL
服务地址
RequiredServices
[数组]服务依赖项
ServiceUpdateURL
服务与注册中心沟通的URL
HeartbeatURL
心跳检测地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 type registry struct { registrations []Registration mutex *sync.RWMutex } type Registration struct { ServiceName ServiceName ServiceURL string RequiredServices []ServiceName ServiceUpdateURL string HeartbeatURL string } type ServiceName string func (r *registry) add(reg Registration) error func (r registry) notify(fullpatch patch) func (r registry) sendRequiredServices(reg Registration) error func (r registry) sendPatch(p patch, url string ) error func (r *registry) remove(url string ) error func (r *registry) Heartbeat(freq time.Duration)
既然注册中心是作为一个web服务实现的,那么肯定是需要一个web server
的,由于项目属于玩票性质,也不算大因此使用Go内置的net/http
来实现,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 type RegistryService struct {}func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Printf("Request received" ) switch r.Method { case http.MethodPost: decoder := json.NewDecoder(r.Body) var r Registration err := decoder.Decode(&r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } log.Printf("Adding service: %v with URL: %s\n" , r.ServiceName, r.ServiceURL) err = reg.add(r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } case http.MethodDelete: payload, err := ioutil.ReadAll(r.Body) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } url := string (payload) log.Printf("Removing service at URL: %s" , url) err = reg.remove(url) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } default : w.WriteHeader(http.StatusMethodNotAllowed) return } }
服务组件 对服务来说,就是根据注册中心定好的规则来注册服务,然后根据自身的依赖来处理对应的功能。 因为要处理相应的依赖,因此除了Registration
外,再定义一个处理服务依赖的结构体及方法: providers
.
1 2 3 4 5 6 7 8 9 10 type providers struct { services map [ServiceName][]string mutex *sync.RWMutex } func (p *providers) Update(pat patch)func (p providers) get(name ServiceName) (string , error )func GetProvide (name ServiceName) (string , error )
组件的web服务 因为每个服务都需要使用注册 这些通用的功能,且这部分的工作都是重复的,因此将web抽出来公用.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func Start (ctx context.Context, host, port string , reg registry.Registration, RegisterHandlersFunc func () ) (context.Context, error ) { RegisterHandlersFunc() ctx = startService(ctx, reg.ServiceName, host, port) err := registry.RegisterService(reg) if err != nil { return ctx, err } return ctx, nil } func startService (ctx context.Context, ServiceName registry.ServiceName, host, port string ) context.Context { ctx, cancel := context.WithCancel(ctx) var srv http.Server srv.Addr = ":" + port address := fmt.Sprintf("http://%s:%s" , host, port) go func () { log.Println(srv.ListenAndServe()) err := registry.ShutdownService(address) if err != nil { log.Println(err) } cancel() }() go func () { fmt.Printf("Service is running in %v\n" , address) fmt.Println("Registry service started. Press any key to stop." ) var s string fmt.Scanln(&s) srv.Shutdown(ctx) err := registry.ShutdownService(address) if err != nil { log.Println(err) } cancel() }() return ctx }
至此,可以开始专心的写业务了
扫描器 得益于Go高并发的优势,开启数百万个的goroutine
的开销也不会很大,非常的轻量!🛫 因此代码实现起来很轻松,大体思路和端口扫描器 类似,在此基础上根据目标特征添加判断条件即可 端口扫描器代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 ... func worker (ports chan int , results chan int ) { for p := range ports { address := fmt.Sprintf("192.168.2.122:%d" , p) fmt.Printf("start %s\n" , address) conn, err := net.DialTimeout("tcp" , address, time.Second*5 ) if err != nil { results <- 0 continue } conn.Close() results <- p } } func main () { ports := make (chan int , 30000 ) results := make (chan int , 100 ) scanEndPort := 65535 var openPorts []int for i := 0 ; i < cap (ports); i++ { go worker(ports, results) } go func () { for i := 1 ; i <= scanEndPort; i++ { ports <- i } close (ports) }() for i := 1 ; i <= scanEndPort; i++ { res := <-results if res != 0 { openPorts = append (openPorts, res) } } close (results) sort.Ints(openPorts) for _, port := range openPorts { fmt.Printf("%d opend\n" , port) } }
测试器 测试器主要功能是从数据库中取出教务系统地址,然后测试。与扫描器不同仅在于扫描器是写,测试器是读。因此这部分内容和扫描器实际上是在同个包内的,只是逻辑上将它分离了出来。
这部分其实和consul
的功能是重复的,因此代码不过多赘述
数据库 由于校园网中,教务系统的地址不会太多,因此数据库的选择十分的随意(不存在性能方面的要求),所以这里使用自己熟悉的redis
作为数据库。由于本系统是和官方的负载均衡并行的 因此存在某些结点用于两者共同访问导致压力上涨,响应不及时,因此利用redis
的sorted-set
在测试的时候,将响应快的地址设置高分数,使用sorted-sets
的好处还有一个就是,集合的元素都是不重复的!,对于代理池来说,这个分数代表着教务系统地址稳定性的重要标准,因此设置分数的规则如下:
基本分:10分,最高分:20
测试器设置一个更严格的超时时间来判断结点是否流畅
分数基本分 为可用,检测器会定时循环检测每个代理可用情况,一旦检测到有可用的代理则立即置为最高分
新获取的代理分数设置为基本分 ,如果测试可行(流畅)则置为满分,不可行(超时)则分数减一
分数减到0后代理移除
由于是内网环境,分数与超时时间可以根据实际情况设置更严格
主要功能就是简单的CRUD啦,本文只讲逻辑与伪代码,实现部分就不多说了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 func max (addr discover.Addr) (err error ) { member, err := addr.MarshalBinary() if err != nil { return err } err = rdb.ZAdd(ctx, redisConfig.Key, &redis.Z{ Score: SCORE_MAX, Member: member, }).Err() if err != nil { return err } return nil } func decrease (addr discover.Addr) (err error ) { member, err := addr.MarshalBinary() if err != nil { return err } err = rdb.ZIncrBy(ctx, redisConfig.Key, -1 , string (member)).Err() if err != nil { return err } score := rdb.ZScore(ctx, redisConfig.Key, string (member)) if score.Val() <= 0.00 { log.Printf("%v current score %v, remove.\n" , addr, score.Val()) err := rdb.ZRem(ctx, redisConfig.Key, member).Err() if err != nil { return err } } return nil } func add (addrs discover.Addrs) (err error ) { for _, addr := range addrs { buf, err := addr.MarshalBinary() if err != nil { return nil } err = rdb.ZAdd(ctx, redisConfig.Key, &redis.Z{ Score: SCORE_DEFAULT, Member: buf, }).Err() if err != nil { return err } } return nil }
consul-upsync-nginx实现动态负载均衡 该部分可以说是系统实现的关键了,因为nginx
自带的负载均衡是写死的,不能根据后端情况动态调整,通过一番搜索对比,最终决定了consul
+ upsync
方案。
upsync
一个Nginx的模块(扩展)
consul
一个分布式高可用的系统
这部分仅限于“能用”阶段,笔者也不太懂,就不乱说啦! 主要就是Api服务 定时的获取数据库内容(分数作为权重),然后推送到consul 中
总结 该项目是学习Go时的一个练手项目,很多地方都不太好,因此仓库就不开源了🐕