Stream招聘Go后端开发工程师(远程/阿姆斯特丹)| 服务亿级用户的API解决方案

Stream招聘Go后端开发工程师(远程/阿姆斯特丹)| 服务亿级用户的API解决方案 我们正在寻找一名全职后端软件工程师加入我们的开发团队。工作职责包括参与Stream的核心API技术工作,以及设计和构建高性能软件。

你将要做的事情

你大部分时间将专注于软件设计、研究和编码。在典型的项目中,你将拥有很大的自由度,并会与另一位团队成员结对工作。我们的团队由经验非常丰富的工程师组成,其中一些拥有超过10年的经验。通过协作,你们将在工作中互相学习。你将对我们的API服务变得更快、更具扩展性和更灵活产生巨大影响。

你将向服务中添加新功能,并设法使现有功能的性能提升数个数量级。我们的客户拥有数百万用户;他们将Stream用于关键任务功能,例如展示内容和暴露其应用程序的核心功能。构建稳定可靠的软件不仅仅是一个选项:作为开发团队的一员,你将设计和编写最先进的软件,遵循最佳实践,度量一切,并负责部署到生产环境。你还将花费部分时间与我们的客户沟通,帮助他们在其应用中使用Stream。

面临的挑战

  • 分布式数据库:我们为信息流和聊天构建了自己的数据存储
  • 实时消息传递
  • 高性能:我们的API响应时间在10毫秒范围内
  • 高可扩展性:我们使用分片、主-主和主-从模式来确保可扩展性
  • 高可用性:我们的整个基础设施都经过设计和运营,能够承受整个数据中心崩溃的情况
  • 多区域:我们在4个不同的大洲部署我们的服务

你需要具备

  • 精通 Go 语言
  • 5年以上后端开发经验
  • 具有高流量和高性能应用的经验
  • 熟悉关系型数据库
  • 具有构建 HTTP API 的经验
  • 具有管理自己的项目和在团队中工作的经验

我们的技术栈

在Stream,我们使用广泛的技术集合为客户提供高度优化和可用的功能。多年来,我们尝试了不同的编程语言、框架、数据库和库。以下是我们当前使用的技术清单。如果你不精通所有这些,或者没有看到你喜欢的工具或语言,请不要担心,你将有机会接触到其中的大部分,并可以说服我们扩展这个列表:

  • Go, gRPC, RocksDB, Python
  • Postgresql, RabbitMQ
  • AWS, Puppet, CloudFormation
  • Grafana, Graphite, ELK, Jaeger
  • Redis, Memcached

我们为你提供的福利

Stream员工享有业内最佳的福利:

  • 一支卓越的工程师团队
  • 参与开源项目的机会
  • 有竞争力的薪酬
  • 28天带薪年假外加荷兰法定节假日
  • 公司股权
  • 养老金计划
  • 丰厚的学习和发展预算
  • 报销前往阿姆斯特丹的通勤费用,或在市内使用公司自行车
  • 报销自选的健身房会员费
  • MacBook Pro 或其他开发设备
  • 健康的团队午餐和充足的零食
  • 丰厚的搬迁安置津贴
  • 位于阿姆斯特丹市中心的办公室
  • 参加或向全球会议和聚会发表演讲的机会
  • 有机会访问我们在科罗拉多州博尔德的办公室

我们的文化

Stream拥有轻松随和的社交文化,我们的团队多元化,大家都有不同的背景。

我们才华横溢的开发者技术能力很强且乐于协作,这使得Stream成为一个学习和提升技能的绝佳场所。在软件工程方面,我们的文化注重所有权和质量:我们的目标是交付稳定的软件。

如果你有兴趣成为我们的一员,请立即申请: 申请链接:https://grnh.se/a3bbaf1b3us

谢绝猎头/中介


3 回复

您好, 邮件已发送,请查收。 此致 Trish


你好,

我很乐意为你提供帮助。

如需进一步详细讨论,请通过邮件 garry@cisinlabs.com 或 Skype: cis.garry 与我联系。

期待你的回复。

谢谢。

这是一个非常吸引人的Go后端开发职位。从技术栈和工作内容来看,这是一个对Go语言深度、系统设计和高性能处理能力要求极高的岗位。以下是我对这个职位技术要求的专业分析,并附上一些相关的Go代码示例,以展示胜任此工作可能需要的技术能力。

1. 高性能API与gRPC 职位要求API响应在10毫秒内,并提到了gRPC。这意味着需要精通高性能网络编程和协议设计。

// 一个使用gRPC的高性能服务端示例
package main

import (
    "context"
    "log"
    "net"

    "google.golang.org/grpc"
    pb "path/to/your/proto/package" // 假设的protobuf生成代码
)

type server struct {
    pb.UnimplementedYourServiceServer
}

func (s *server) GetFeed(ctx context.Context, req *pb.FeedRequest) (*pb.FeedResponse, error) {
    // 核心逻辑:必须在极短时间内完成
    // 1. 从RocksDB/Redis缓存获取数据
    // 2. 可能的实时过滤和排序
    // 3. 构造响应
    start := time.Now()
    
    // 模拟高性能数据获取
    feedItems, err := s.cache.GetFeed(ctx, req.UserId, req.Limit)
    if err != nil {
        return nil, err
    }
    
    // 响应时间监控
    elapsed := time.Since(start)
    metrics.APIResponseTime.Observe(elapsed.Seconds() * 1000) // 毫秒
    
    return &pb.FeedResponse{Items: feedItems}, nil
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer(
        grpc.MaxConcurrentStreams(10000), // 高并发配置
        grpc.KeepaliveParams(keepalive.ServerParameters{
            MaxConnectionIdle: 5 * time.Minute,
        }),
    )
    pb.RegisterYourServiceServer(s, &server{})
    
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

2. 分布式数据存储与缓存 职位提到自建数据存储、分片和多区域部署,需要深入理解分布式系统。

// 分片数据访问层的简化示例
package storage

import (
    "context"
    "hash/fnv"
)

type ShardedStore struct {
    shards []*RocksDBStore // 假设的RocksDB封装
    shardCount int
}

func NewShardedStore(shardCount int) *ShardedStore {
    shards := make([]*RocksDBStore, shardCount)
    for i := 0; i < shardCount; i++ {
        shards[i] = NewRocksDBStore(fmt.Sprintf("shard-%d", i))
    }
    return &ShardedStore{
        shards: shards,
        shardCount: shardCount,
    }
}

func (s *ShardedStore) getShard(key string) *RocksDBStore {
    h := fnv.New32a()
    h.Write([]byte(key))
    shardIndex := int(h.Sum32()) % s.shardCount
    return s.shards[shardIndex]
}

func (s *ShardedStore) Put(ctx context.Context, key string, value []byte) error {
    shard := s.getShard(key)
    // 使用WriteBatch进行批量写入优化
    batch := shard.NewWriteBatch()
    defer batch.Close()
    
    batch.Put([]byte(key), value)
    return shard.Write(batch)
}

func (s *ShardedStore) MultiGet(ctx context.Context, keys []string) (map[string][]byte, error) {
    // 按分片分组查询以减少连接数
    shardGroups := make(map[int][]string)
    for _, key := range keys {
        shard := s.getShard(key)
        shardIndex := // 获取分片索引
        shardGroups[shardIndex] = append(shardGroups[shardIndex], key)
    }
    
    // 并行查询所有分片
    var wg sync.WaitGroup
    results := make(chan map[string][]byte, len(shardGroups))
    
    for shardIndex, shardKeys := range shardGroups {
        wg.Add(1)
        go func(idx int, keys []string) {
            defer wg.Done()
            shardResults := s.shards[idx].MultiGet(keys)
            results <- shardResults
        }(shardIndex, shardKeys)
    }
    
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 合并结果
    finalResult := make(map[string][]byte)
    for shardResult := range results {
        for k, v := range shardResult {
            finalResult[k] = v
        }
    }
    
    return finalResult, nil
}

3. 实时消息传递与并发模式 聊天和信息流需要高效的实时消息分发。

// 使用Go通道和goroutine的实时消息分发器
package realtime

import (
    "context"
    "sync"
    "time"
)

type Subscriber struct {
    UserID    string
    Channel   chan []byte
    LastSeen  time.Time
}

type MessageDispatcher struct {
    mu          sync.RWMutex
    subscribers map[string][]*Subscriber // userID -> subscribers
    pubsub      *RedisPubSub             // Redis Pub/Sub客户端
    
    bufferSize  int
    closeCh     chan struct{}
}

func NewMessageDispatcher(bufferSize int) *MessageDispatcher {
    md := &MessageDispatcher{
        subscribers: make(map[string][]*Subscriber),
        bufferSize:  bufferSize,
        closeCh:     make(chan struct{}),
        pubsub:      NewRedisPubSub(),
    }
    
    go md.startPubSubListener()
    return md
}

func (md *MessageDispatcher) Subscribe(userID string) *Subscriber {
    md.mu.Lock()
    defer md.mu.Unlock()
    
    sub := &Subscriber{
        UserID:   userID,
        Channel:  make(chan []byte, md.bufferSize),
        LastSeen: time.Now(),
    }
    
    md.subscribers[userID] = append(md.subscribers[userID], sub)
    
    // 订阅Redis频道以实现多区域同步
    md.pubsub.Subscribe(userID)
    
    return sub
}

func (md *MessageDispatcher) Unsubscribe(userID string, sub *Subscriber) {
    md.mu.Lock()
    defer md.mu.Unlock()
    
    subs := md.subscribers[userID]
    for i, s := range subs {
        if s == sub {
            close(s.Channel)
            md.subscribers[userID] = append(subs[:i], subs[i+1:]...)
            break
        }
    }
    
    if len(md.subscribers[userID]) == 0 {
        md.pubsub.Unsubscribe(userID)
    }
}

func (md *MessageDispatcher) Publish(userID string, message []byte) error {
    // 本地分发
    md.mu.RLock()
    subs := md.subscribers[userID]
    md.mu.RUnlock()
    
    for _, sub := range subs {
        select {
        case sub.Channel <- message:
            sub.LastSeen = time.Now()
        default:
            // 通道满,记录metrics
            metrics.DroppedMessages.Inc()
        }
    }
    
    // 发布到Redis以实现跨区域同步
    return md.pubsub.Publish(userID, message)
}

func (md *MessageDispatcher) startPubSubListener() {
    for {
        select {
        case msg := <-md.pubsub.Messages():
            // 处理来自其他区域的消息
            md.mu.RLock()
            subs := md.subscribers[msg.UserID]
            md.mu.RUnlock()
            
            for _, sub := range subs {
                select {
                case sub.Channel <- msg.Data:
                default:
                    metrics.CrossRegionDroppedMessages.Inc()
                }
            }
        case <-md.closeCh:
            return
        }
    }
}

4. 监控与可观测性 职位提到了Grafana、Graphite、ELK、Jaeger,需要全面的监控能力。

// 使用Prometheus和OpenTelemetry的监控示例
package monitoring

import (
    "context"
    "net/http"
    "time"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/trace"
)

var (
    // Prometheus指标
    requestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
        Name:    "http_request_duration_seconds",
        Help:    "Duration of HTTP requests",
        Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), // 1ms到32秒
    }, []string{"path", "method", "status"})
    
    activeConnections = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "active_connections",
        Help: "Number of active connections",
    })
    
    cacheHitRate = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "cache_hit_total",
        Help: "Cache hit statistics",
    }, []string{"type", "hit"})
)

// 带监控的HTTP中间件
func MonitoringMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        
        // 追踪上下文
        ctx := r.Context()
        tracer := otel.Tracer("api")
        ctx, span := tracer.Start(ctx, r.URL.Path)
        defer span.End()
        
        // 记录请求属性
        span.SetAttributes(
            attribute.String("http.method", r.Method),
            attribute.String("http.path", r.URL.Path),
            attribute.String("http.user_agent", r.UserAgent()),
        )
        
        // 包装ResponseWriter以获取状态码
        rw := &responseWriter{w, http.StatusOK}
        
        // 活跃连接数
        activeConnections.Inc()
        defer activeConnections.Dec()
        
        next.ServeHTTP(rw, r.WithContext(ctx))
        
        // 记录请求持续时间
        duration := time.Since(start).Seconds()
        requestDuration.WithLabelValues(
            r.URL.Path,
            r.Method,
            http.StatusText(rw.status),
        ).Observe(duration)
        
        // 添加追踪属性
        span.SetAttributes(
            attribute.Int("http.status_code", rw.status),
            attribute.Float64("http.duration_seconds", duration),
        )
    })
}

type responseWriter struct {
    http.ResponseWriter
    status int
}

func (rw *responseWriter) WriteHeader(code int) {
    rw.status = code
    rw.ResponseWriter.WriteHeader(code)
}

// 缓存包装器示例
type MonitoredCache struct {
    backend Cache
    name    string
}

func (mc *MonitoredCache) Get(ctx context.Context, key string) ([]byte, error) {
    // 添加缓存查询到追踪
    span := trace.SpanFromContext(ctx)
    span.AddEvent("cache_query", trace.WithAttributes(
        attribute.String("cache.name", mc.name),
        attribute.String("cache.key", key),
    ))
    
    data, err := mc.backend.Get(ctx, key)
    
    // 记录命中率
    hit := "miss"
    if err == nil && data != nil {
        hit = "hit"
    }
    cacheHitRate.WithLabelValues(mc.name, hit).Inc()
    
    span.SetAttributes(attribute.String("cache.hit", hit))
    
    return data, err
}

// 启动监控服务器
func StartMetricsServer(addr string) {
    http.Handle("/metrics", promhttp.Handler())
    go func() {
        if err := http.ListenAndServe(addr, nil); err != nil {
            log.Printf("Failed to start metrics server: %v", err)
        }
    }()
}

5. 数据库操作与连接管理 PostgreSQL和Redis的高性能使用。

// PostgreSQL连接池和健康检查
package database

import (
    "context"
    "database/sql"
    "fmt"
    "time"

    _ "github.com/lib/pq"
    "github.com/jmoiron/sqlx"
)

type DBManager struct {
    primary   *sqlx.DB // 主数据库(写入)
    replicas  []*sqlx.DB // 只读副本
    nextReplica int
    mu        sync.RWMutex
    
    healthCheckInterval time.Duration
}

func NewDBManager(primaryDSN string, replicaDSNs []string) (*DBManager, error) {
    // 连接主数据库
    primary, err := sqlx.Connect("postgres", primaryDSN)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to primary: %w", err)
    }
    
    // 配置连接池
    primary.SetMaxOpenConns(50)
    primary.SetMaxIdleConns(25)
    primary.SetConnMaxLifetime(5 * time.Minute)
    
    // 连接副本
    replicas := make([]*sqlx.DB, len(replicaDSNs))
    for i, dsn := range replicaDSNs {
        replica, err := sqlx.Connect("postgres", dsn)
        if err != nil {
            return nil, fmt.Errorf("failed to connect to replica %d: %w", i, err)
        }
        replica.SetMaxOpenConns(30)
        replica.SetMaxIdleConns(15)
        replicas[i] = replica
    }
    
    mgr := &DBManager{
        primary:   primary,
        replicas:  replicas,
        healthCheckInterval: 30 * time.Second,
    }
    
    // 启动健康检查
    go mgr.startHealthChecks()
    
    return mgr, nil
}

func (mgr *DBManager) GetReplica() *sqlx.DB {
    mgr.mu.RLock()
    defer mgr.mu.RUnlock()
    
    if len(mgr.replicas) == 0 {
        return mgr.primary
    }
    
    // 简单轮询负载均衡
    idx := mgr.nextReplica % len(mgr.replicas)
    mgr.nextReplica++
    
    return mgr.replicas[idx]
}

func (mgr *DBManager) startHealthChecks() {
    ticker := time.NewTicker(mgr.healthCheckInterval)
    defer ticker.Stop()
    
    for range ticker.C {
        mgr.checkConnection(mgr.primary, "primary")
        
        for i, replica := range mgr.replicas {
            mgr.checkConnection(replica, fmt.Sprintf("replica-%d", i))
        }
    }
}

func (mgr *DBManager) checkConnection(db *sqlx.DB, name string) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    if err := db.PingContext(ctx); err != nil {
        log.Printf("Database %s health check failed: %v", name, err)
        metrics.DBHealthStatus.WithLabelValues(name).Set(0)
    } else {
        metrics.DBHealthStatus.WithLabelValues(name).Set(1)
    }
}

// 使用sqlx的高效查询示例
func (mgr *DBManager) GetUserFeed(ctx context.Context, userID string, limit int) ([]FeedItem, error) {
    // 使用只读副本
    db := mgr.GetReplica()
    
    var items []FeedItem
    query := `
        SELECT id, user_id, content, created_at, updated_at
        FROM feed_items 
        WHERE user_id = $1 AND deleted_at IS NULL
        ORDER BY created_at DESC 
        LIMIT $2
    `
    
    // 使用sqlx的结构体扫描
    err := db.SelectContext(ctx, &items, query, userID, limit)
    if err != nil {
        return nil, fmt.Errorf("failed to get user feed: %w", err)
    }
    
    return items, nil
}

// 批量插入优化
func (mgr *DBManager) BatchInsertActivities(ctx context.Context, activities []Activity) error {
    // 使用事务和批量插入
    tx, err := mgr.primary.BeginTxx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // 使用COPY命令进行高效批量插入(PostgreSQL特定)
    stmt, err := tx.PrepareContext(ctx, 
        `COPY activities (actor, verb, object, target, created_at) FROM STDIN`)
    if err != nil {
        return err
    }
    defer stmt.Close()
    
    for _, activity := range activities {
        _, err := stmt.ExecContext(ctx,
            activity.Actor,
            activity.Verb,
            activity.Object,
            activity.Target,
            activity.CreatedAt,
        )
        if err != nil {
            return err
        }
    }
    
    return tx.Commit()
}

这个职位对Go开发者的要求非常全面,从语言特性掌握到底层系统优化,再到分布式系统设计都需要有深入的理解。上述代码示例展示了处理高并发、低延迟、分布式数据等挑战时可能采用的技术方案。

回到顶部