从上学期开始,学校的教务系统更新了。新版的正方多了一个负载均衡的功能,但无奈实在做的太差了!为什么这么说尼?
因为在上学期抢课的时候入口网关依旧拉闸,而真正的后端服务器访问速度比德芙还丝滑。因此暂将锅甩给负载均衡服务器。既然学校的负载均衡拉闸,那么就有了 “帮学校做负载均衡” 的想法。
大体的思路就是,通过扫描内网网段发现隐藏的地址,然后负载均衡这些隐藏的地址。
既然是负载均衡,第一时间想到的就是Nginx了,通过资料搜集,大致确定了基本的流程,因此有了这么的这个流程图:

需要解决的问题

由于非官方的代理,因此我们无法知道教务系统部署在内网的哪些机器上,而校园网一般使用B类网,因此需要扫描大量的ip地址。
但这样无疑会触发学校网管的报警,正常情况下会导致ip短时间内被封。导致扫描质量低(由于被禁网,导致程序误认为扫描超时从而导致目标未被发现)。
基于这个问题,随之而生的想法就是分布式扫描,将扫描的工作打散到用户中,服务器只负责扫描几个主干网段。
一句话概括就是: 将扫描工作分散到多台机器上,流程图如下:

Todo

前情提示: 本文通信均采用HTTP、代码部分存在伪代码
代码仓库: (暂不开源)

  1. 注册中心 -> 服务注册服务发现
  2. 日志服务 -> 记录日志
  3. 数据库服务 -> 数据库操作(CRUD)
  4. 扫描器服务 -> 发现教务系统地址
  5. 测试器服务 -> 测试数据库中地址的健康度
  6. Api服务 -> 向consul发送负载信息

大致结构如下:

注册中心

作为一个分布式系统,不可避免地需要服务注册服务发现。因此需要一个注册中心来处理各个服务之间的依赖关系,在服务上线后通知依赖这个服务的服务(这里有点绕)
举个例子:
数据库的每个操作都需要记录日志,日志为了统一管理所以有一个日志服务专门处理日志信息。此时,日志服务因某些原因(可能是人为、也可能是网络掉线等)在数据库服务注册之后才注册,这是注册中心就需要通知数据库,让数据库的日志记录转成使用日志服务

功能与实现

作为一个注册中心,首先我们需要一个web服务来接收服务发送的信息(注册、依赖更新、注销等等),但在这之前我们先来定义一下我们要用到的结构(在面向对象中为类)

  1. registry来表示注册操作
    • registrations来存放注册的服务
    • add 注册服务
    • notify 事件通知
    • sendRequiredServices 发送依赖的服务
    • sendPatch 发送依赖项
    • remove 移除(注销)服务
    • Heartbeat 心跳包
  2. Registration表示服务注册结构体
    • ServiceName 服务名
    • ServiceURL 服务地址
    • RequiredServices[数组]服务依赖项
    • ServiceUpdateURL 服务与注册中心沟通的URL
    • HeartbeatURL 心跳检测地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type registry struct {
registrations []Registration
mutex *sync.RWMutex
}
// 服务注册结构体
type Registration struct {
ServiceName ServiceName
ServiceURL string
RequiredServices []ServiceName
ServiceUpdateURL string
HeartbeatURL string
}
type ServiceName string
// 添加注册
func (r *registry) add(reg Registration) error

// 当依赖服务运行时,通知依赖者
func (r registry) notify(fullpatch patch)
// 注册中心向服务发送依赖相关内容
func (r registry) sendRequiredServices(reg Registration) error
// 发送依赖项
func (r registry) sendPatch(p patch, url string) error
// 移除注册(注销)
func (r *registry) remove(url string) error
// 心跳包
func (r *registry) Heartbeat(freq time.Duration)

既然注册中心是作为一个web服务实现的,那么肯定是需要一个web server的,由于项目属于玩票性质,也不算大因此使用Go内置的net/http来实现,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type RegistryService struct{}
func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("Request received")
switch r.Method {
case http.MethodPost:
decoder := json.NewDecoder(r.Body)
var r Registration
err := decoder.Decode(&r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
log.Printf("Adding service: %v with URL: %s\n", r.ServiceName, r.ServiceURL)
err = reg.add(r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
case http.MethodDelete:
payload, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url := string(payload)
log.Printf("Removing service at URL: %s", url)
err = reg.remove(url)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}

服务组件

对服务来说,就是根据注册中心定好的规则来注册服务,然后根据自身的依赖来处理对应的功能。
因为要处理相应的依赖,因此除了Registration外,再定义一个处理服务依赖的结构体及方法: providers.

1
2
3
4
5
6
7
8
9
10
type providers struct {
services map[ServiceName][]string // 服务名->服务url
mutex *sync.RWMutex
}
// 更新依赖
func (p *providers) Update(pat patch)
// // 通过服务名称,找到依赖的urls,从依赖项里面随机返回一个url
func (p providers) get(name ServiceName) (string, error)
// 导出给外部使用
func GetProvide(name ServiceName) (string, error)

组件的web服务

因为每个服务都需要使用注册这些通用的功能,且这部分的工作都是重复的,因此将web抽出来公用.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func Start(ctx context.Context, host, port string,
reg registry.Registration,
RegisterHandlersFunc func()) (context.Context, error) {

RegisterHandlersFunc()
ctx = startService(ctx, reg.ServiceName, host, port)
err := registry.RegisterService(reg)
if err != nil {
return ctx, err
}
return ctx, nil
}

func startService(ctx context.Context, ServiceName registry.ServiceName,
host, port string) context.Context {
ctx, cancel := context.WithCancel(ctx)
var srv http.Server
srv.Addr = ":" + port
address := fmt.Sprintf("http://%s:%s", host, port)

go func() {
log.Println(srv.ListenAndServe())
// 关闭的时候要取消注册
// ... todo
err := registry.ShutdownService(address)
if err != nil {
log.Println(err)
}
cancel()
}()

go func() {
fmt.Printf("Service is running in %v\n", address)
fmt.Println("Registry service started. Press any key to stop.")
var s string
fmt.Scanln(&s)
srv.Shutdown(ctx)
err := registry.ShutdownService(address)
if err != nil {
log.Println(err)
}
cancel()
}()
return ctx
}

至此,可以开始专心的写业务了

扫描器

得益于Go高并发的优势,开启数百万个的goroutine的开销也不会很大,非常的轻量!🛫
因此代码实现起来很轻松,大体思路和端口扫描器类似,在此基础上根据目标特征添加判断条件即可
端口扫描器代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
...
func worker(ports chan int, results chan int) {
for p := range ports {
address := fmt.Sprintf("192.168.2.122:%d", p)
fmt.Printf("start %s\n", address)
conn, err := net.DialTimeout("tcp", address, time.Second*5)
if err != nil {
results <- 0
continue
}
conn.Close()
results <- p
}
}

func main() {
ports := make(chan int, 30000)
results := make(chan int, 100)
scanEndPort := 65535
var openPorts []int

for i := 0; i < cap(ports); i++ {
go worker(ports, results)
}
// push port to channel
go func() {
for i := 1; i <= scanEndPort; i++ {
ports <- i
}
close(ports)
}()

for i := 1; i <= scanEndPort; i++ {
res := <-results
if res != 0 {
openPorts = append(openPorts, res)
}
}
close(results)
sort.Ints(openPorts)
for _, port := range openPorts {
fmt.Printf("%d opend\n", port)
}
}

测试器

测试器主要功能是从数据库中取出教务系统地址,然后测试。与扫描器不同仅在于扫描器是写,测试器是读。因此这部分内容和扫描器实际上是在同个包内的,只是逻辑上将它分离了出来。

这部分其实和consul的功能是重复的,因此代码不过多赘述

数据库

由于校园网中,教务系统的地址不会太多,因此数据库的选择十分的随意(不存在性能方面的要求),所以这里使用自己熟悉的redis作为数据库。
由于本系统是和官方的负载均衡并行的 因此存在某些结点用于两者共同访问导致压力上涨,响应不及时,因此利用redissorted-set在测试的时候,将响应快的地址设置高分数,使用sorted-sets的好处还有一个就是,集合的元素都是不重复的!,对于代理池来说,这个分数代表着教务系统地址稳定性的重要标准,因此设置分数的规则如下:

  • 基本分:10分,最高分:20
  • 测试器设置一个更严格的超时时间来判断结点是否流畅
  • 分数基本分为可用,检测器会定时循环检测每个代理可用情况,一旦检测到有可用的代理则立即置为最高分
  • 新获取的代理分数设置为基本分,如果测试可行(流畅)则置为满分,不可行(超时)则分数减一
  • 分数减到0后代理移除

由于是内网环境,分数与超时时间可以根据实际情况设置更严格

主要功能就是简单的CRUD啦,本文只讲逻辑与伪代码,实现部分就不多说了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// set jwglxt to max score
func max(addr discover.Addr) (err error) {
member, err := addr.MarshalBinary()
if err != nil {
return err
}
err = rdb.ZAdd(ctx, redisConfig.Key, &redis.Z{
Score: SCORE_MAX,
Member: member,
}).Err()
if err != nil {
return err
}
return nil
}
// decrease proxy score
func decrease(addr discover.Addr) (err error) {
member, err := addr.MarshalBinary()
if err != nil {
return err
}
err = rdb.ZIncrBy(ctx, redisConfig.Key, -1, string(member)).Err()
if err != nil {
return err
}
score := rdb.ZScore(ctx, redisConfig.Key, string(member))
if score.Val() <= 0.00 {
log.Printf("%v current score %v, remove.\n", addr, score.Val())
err := rdb.ZRem(ctx, redisConfig.Key, member).Err()
if err != nil {
return err
}
}
return nil
}
// add new proxy to redis
func add(addrs discover.Addrs) (err error) {
for _, addr := range addrs {
buf, err := addr.MarshalBinary()
if err != nil {
return nil
}
err = rdb.ZAdd(ctx, redisConfig.Key, &redis.Z{
Score: SCORE_DEFAULT,
Member: buf,
}).Err()
if err != nil {
return err
}
}
return nil
}

consul-upsync-nginx实现动态负载均衡

该部分可以说是系统实现的关键了,因为nginx自带的负载均衡是写死的,不能根据后端情况动态调整,通过一番搜索对比,最终决定了consul + upsync方案。

  1. upsync 一个Nginx的模块(扩展)
  2. consul 一个分布式高可用的系统

这部分仅限于“能用”阶段,笔者也不太懂,就不乱说啦!
主要就是Api服务定时的获取数据库内容(分数作为权重),然后推送到consul

总结

该项目是学习Go时的一个练手项目,很多地方都不太好,因此仓库就不开源了🐕