Stream招聘Go后端开发工程师(远程/阿姆斯特丹)| 服务亿级用户的API解决方案
Stream招聘Go后端开发工程师(远程/阿姆斯特丹)| 服务亿级用户的API解决方案 我们正在寻找一名全职后端软件工程师加入我们的开发团队。工作职责包括参与Stream的核心API技术工作,以及设计和构建高性能软件。
你将要做的事情
你大部分时间将专注于软件设计、研究和编码。在典型的项目中,你将拥有很大的自由度,并会与另一位团队成员结对工作。我们的团队由经验非常丰富的工程师组成,其中一些拥有超过10年的经验。通过协作,你们将在工作中互相学习。你将对我们的API服务变得更快、更具扩展性和更灵活产生巨大影响。
你将向服务中添加新功能,并设法使现有功能的性能提升数个数量级。我们的客户拥有数百万用户;他们将Stream用于关键任务功能,例如展示内容和暴露其应用程序的核心功能。构建稳定可靠的软件不仅仅是一个选项:作为开发团队的一员,你将设计和编写最先进的软件,遵循最佳实践,度量一切,并负责部署到生产环境。你还将花费部分时间与我们的客户沟通,帮助他们在其应用中使用Stream。
面临的挑战
- 分布式数据库:我们为信息流和聊天构建了自己的数据存储
- 实时消息传递
- 高性能:我们的API响应时间在10毫秒范围内
- 高可扩展性:我们使用分片、主-主和主-从模式来确保可扩展性
- 高可用性:我们的整个基础设施都经过设计和运营,能够承受整个数据中心崩溃的情况
- 多区域:我们在4个不同的大洲部署我们的服务
你需要具备
- 精通 Go 语言
- 5年以上后端开发经验
- 具有高流量和高性能应用的经验
- 熟悉关系型数据库
- 具有构建 HTTP API 的经验
- 具有管理自己的项目和在团队中工作的经验
我们的技术栈
在Stream,我们使用广泛的技术集合为客户提供高度优化和可用的功能。多年来,我们尝试了不同的编程语言、框架、数据库和库。以下是我们当前使用的技术清单。如果你不精通所有这些,或者没有看到你喜欢的工具或语言,请不要担心,你将有机会接触到其中的大部分,并可以说服我们扩展这个列表:
- Go, gRPC, RocksDB, Python
- Postgresql, RabbitMQ
- AWS, Puppet, CloudFormation
- Grafana, Graphite, ELK, Jaeger
- Redis, Memcached
我们为你提供的福利
Stream员工享有业内最佳的福利:
- 一支卓越的工程师团队
- 参与开源项目的机会
- 有竞争力的薪酬
- 28天带薪年假外加荷兰法定节假日
- 公司股权
- 养老金计划
- 丰厚的学习和发展预算
- 报销前往阿姆斯特丹的通勤费用,或在市内使用公司自行车
- 报销自选的健身房会员费
- MacBook Pro 或其他开发设备
- 健康的团队午餐和充足的零食
- 丰厚的搬迁安置津贴
- 位于阿姆斯特丹市中心的办公室
- 参加或向全球会议和聚会发表演讲的机会
- 有机会访问我们在科罗拉多州博尔德的办公室
我们的文化
Stream拥有轻松随和的社交文化,我们的团队多元化,大家都有不同的背景。
我们才华横溢的开发者技术能力很强且乐于协作,这使得Stream成为一个学习和提升技能的绝佳场所。在软件工程方面,我们的文化注重所有权和质量:我们的目标是交付稳定的软件。
如果你有兴趣成为我们的一员,请立即申请: 申请链接:https://grnh.se/a3bbaf1b3us
谢绝猎头/中介
您好, 邮件已发送,请查收。 此致 Trish
这是一个非常吸引人的Go后端开发职位。从技术栈和工作内容来看,这是一个对Go语言深度、系统设计和高性能处理能力要求极高的岗位。以下是我对这个职位技术要求的专业分析,并附上一些相关的Go代码示例,以展示胜任此工作可能需要的技术能力。
1. 高性能API与gRPC 职位要求API响应在10毫秒内,并提到了gRPC。这意味着需要精通高性能网络编程和协议设计。
// 一个使用gRPC的高性能服务端示例
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
pb "path/to/your/proto/package" // 假设的protobuf生成代码
)
type server struct {
pb.UnimplementedYourServiceServer
}
func (s *server) GetFeed(ctx context.Context, req *pb.FeedRequest) (*pb.FeedResponse, error) {
// 核心逻辑:必须在极短时间内完成
// 1. 从RocksDB/Redis缓存获取数据
// 2. 可能的实时过滤和排序
// 3. 构造响应
start := time.Now()
// 模拟高性能数据获取
feedItems, err := s.cache.GetFeed(ctx, req.UserId, req.Limit)
if err != nil {
return nil, err
}
// 响应时间监控
elapsed := time.Since(start)
metrics.APIResponseTime.Observe(elapsed.Seconds() * 1000) // 毫秒
return &pb.FeedResponse{Items: feedItems}, nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer(
grpc.MaxConcurrentStreams(10000), // 高并发配置
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute,
}),
)
pb.RegisterYourServiceServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
2. 分布式数据存储与缓存 职位提到自建数据存储、分片和多区域部署,需要深入理解分布式系统。
// 分片数据访问层的简化示例
package storage
import (
"context"
"hash/fnv"
)
type ShardedStore struct {
shards []*RocksDBStore // 假设的RocksDB封装
shardCount int
}
func NewShardedStore(shardCount int) *ShardedStore {
shards := make([]*RocksDBStore, shardCount)
for i := 0; i < shardCount; i++ {
shards[i] = NewRocksDBStore(fmt.Sprintf("shard-%d", i))
}
return &ShardedStore{
shards: shards,
shardCount: shardCount,
}
}
func (s *ShardedStore) getShard(key string) *RocksDBStore {
h := fnv.New32a()
h.Write([]byte(key))
shardIndex := int(h.Sum32()) % s.shardCount
return s.shards[shardIndex]
}
func (s *ShardedStore) Put(ctx context.Context, key string, value []byte) error {
shard := s.getShard(key)
// 使用WriteBatch进行批量写入优化
batch := shard.NewWriteBatch()
defer batch.Close()
batch.Put([]byte(key), value)
return shard.Write(batch)
}
func (s *ShardedStore) MultiGet(ctx context.Context, keys []string) (map[string][]byte, error) {
// 按分片分组查询以减少连接数
shardGroups := make(map[int][]string)
for _, key := range keys {
shard := s.getShard(key)
shardIndex := // 获取分片索引
shardGroups[shardIndex] = append(shardGroups[shardIndex], key)
}
// 并行查询所有分片
var wg sync.WaitGroup
results := make(chan map[string][]byte, len(shardGroups))
for shardIndex, shardKeys := range shardGroups {
wg.Add(1)
go func(idx int, keys []string) {
defer wg.Done()
shardResults := s.shards[idx].MultiGet(keys)
results <- shardResults
}(shardIndex, shardKeys)
}
go func() {
wg.Wait()
close(results)
}()
// 合并结果
finalResult := make(map[string][]byte)
for shardResult := range results {
for k, v := range shardResult {
finalResult[k] = v
}
}
return finalResult, nil
}
3. 实时消息传递与并发模式 聊天和信息流需要高效的实时消息分发。
// 使用Go通道和goroutine的实时消息分发器
package realtime
import (
"context"
"sync"
"time"
)
type Subscriber struct {
UserID string
Channel chan []byte
LastSeen time.Time
}
type MessageDispatcher struct {
mu sync.RWMutex
subscribers map[string][]*Subscriber // userID -> subscribers
pubsub *RedisPubSub // Redis Pub/Sub客户端
bufferSize int
closeCh chan struct{}
}
func NewMessageDispatcher(bufferSize int) *MessageDispatcher {
md := &MessageDispatcher{
subscribers: make(map[string][]*Subscriber),
bufferSize: bufferSize,
closeCh: make(chan struct{}),
pubsub: NewRedisPubSub(),
}
go md.startPubSubListener()
return md
}
func (md *MessageDispatcher) Subscribe(userID string) *Subscriber {
md.mu.Lock()
defer md.mu.Unlock()
sub := &Subscriber{
UserID: userID,
Channel: make(chan []byte, md.bufferSize),
LastSeen: time.Now(),
}
md.subscribers[userID] = append(md.subscribers[userID], sub)
// 订阅Redis频道以实现多区域同步
md.pubsub.Subscribe(userID)
return sub
}
func (md *MessageDispatcher) Unsubscribe(userID string, sub *Subscriber) {
md.mu.Lock()
defer md.mu.Unlock()
subs := md.subscribers[userID]
for i, s := range subs {
if s == sub {
close(s.Channel)
md.subscribers[userID] = append(subs[:i], subs[i+1:]...)
break
}
}
if len(md.subscribers[userID]) == 0 {
md.pubsub.Unsubscribe(userID)
}
}
func (md *MessageDispatcher) Publish(userID string, message []byte) error {
// 本地分发
md.mu.RLock()
subs := md.subscribers[userID]
md.mu.RUnlock()
for _, sub := range subs {
select {
case sub.Channel <- message:
sub.LastSeen = time.Now()
default:
// 通道满,记录metrics
metrics.DroppedMessages.Inc()
}
}
// 发布到Redis以实现跨区域同步
return md.pubsub.Publish(userID, message)
}
func (md *MessageDispatcher) startPubSubListener() {
for {
select {
case msg := <-md.pubsub.Messages():
// 处理来自其他区域的消息
md.mu.RLock()
subs := md.subscribers[msg.UserID]
md.mu.RUnlock()
for _, sub := range subs {
select {
case sub.Channel <- msg.Data:
default:
metrics.CrossRegionDroppedMessages.Inc()
}
}
case <-md.closeCh:
return
}
}
}
4. 监控与可观测性 职位提到了Grafana、Graphite、ELK、Jaeger,需要全面的监控能力。
// 使用Prometheus和OpenTelemetry的监控示例
package monitoring
import (
"context"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var (
// Prometheus指标
requestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "Duration of HTTP requests",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), // 1ms到32秒
}, []string{"path", "method", "status"})
activeConnections = promauto.NewGauge(prometheus.GaugeOpts{
Name: "active_connections",
Help: "Number of active connections",
})
cacheHitRate = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cache_hit_total",
Help: "Cache hit statistics",
}, []string{"type", "hit"})
)
// 带监控的HTTP中间件
func MonitoringMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 追踪上下文
ctx := r.Context()
tracer := otel.Tracer("api")
ctx, span := tracer.Start(ctx, r.URL.Path)
defer span.End()
// 记录请求属性
span.SetAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.path", r.URL.Path),
attribute.String("http.user_agent", r.UserAgent()),
)
// 包装ResponseWriter以获取状态码
rw := &responseWriter{w, http.StatusOK}
// 活跃连接数
activeConnections.Inc()
defer activeConnections.Dec()
next.ServeHTTP(rw, r.WithContext(ctx))
// 记录请求持续时间
duration := time.Since(start).Seconds()
requestDuration.WithLabelValues(
r.URL.Path,
r.Method,
http.StatusText(rw.status),
).Observe(duration)
// 添加追踪属性
span.SetAttributes(
attribute.Int("http.status_code", rw.status),
attribute.Float64("http.duration_seconds", duration),
)
})
}
type responseWriter struct {
http.ResponseWriter
status int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.status = code
rw.ResponseWriter.WriteHeader(code)
}
// 缓存包装器示例
type MonitoredCache struct {
backend Cache
name string
}
func (mc *MonitoredCache) Get(ctx context.Context, key string) ([]byte, error) {
// 添加缓存查询到追踪
span := trace.SpanFromContext(ctx)
span.AddEvent("cache_query", trace.WithAttributes(
attribute.String("cache.name", mc.name),
attribute.String("cache.key", key),
))
data, err := mc.backend.Get(ctx, key)
// 记录命中率
hit := "miss"
if err == nil && data != nil {
hit = "hit"
}
cacheHitRate.WithLabelValues(mc.name, hit).Inc()
span.SetAttributes(attribute.String("cache.hit", hit))
return data, err
}
// 启动监控服务器
func StartMetricsServer(addr string) {
http.Handle("/metrics", promhttp.Handler())
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Printf("Failed to start metrics server: %v", err)
}
}()
}
5. 数据库操作与连接管理 PostgreSQL和Redis的高性能使用。
// PostgreSQL连接池和健康检查
package database
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/lib/pq"
"github.com/jmoiron/sqlx"
)
type DBManager struct {
primary *sqlx.DB // 主数据库(写入)
replicas []*sqlx.DB // 只读副本
nextReplica int
mu sync.RWMutex
healthCheckInterval time.Duration
}
func NewDBManager(primaryDSN string, replicaDSNs []string) (*DBManager, error) {
// 连接主数据库
primary, err := sqlx.Connect("postgres", primaryDSN)
if err != nil {
return nil, fmt.Errorf("failed to connect to primary: %w", err)
}
// 配置连接池
primary.SetMaxOpenConns(50)
primary.SetMaxIdleConns(25)
primary.SetConnMaxLifetime(5 * time.Minute)
// 连接副本
replicas := make([]*sqlx.DB, len(replicaDSNs))
for i, dsn := range replicaDSNs {
replica, err := sqlx.Connect("postgres", dsn)
if err != nil {
return nil, fmt.Errorf("failed to connect to replica %d: %w", i, err)
}
replica.SetMaxOpenConns(30)
replica.SetMaxIdleConns(15)
replicas[i] = replica
}
mgr := &DBManager{
primary: primary,
replicas: replicas,
healthCheckInterval: 30 * time.Second,
}
// 启动健康检查
go mgr.startHealthChecks()
return mgr, nil
}
func (mgr *DBManager) GetReplica() *sqlx.DB {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
if len(mgr.replicas) == 0 {
return mgr.primary
}
// 简单轮询负载均衡
idx := mgr.nextReplica % len(mgr.replicas)
mgr.nextReplica++
return mgr.replicas[idx]
}
func (mgr *DBManager) startHealthChecks() {
ticker := time.NewTicker(mgr.healthCheckInterval)
defer ticker.Stop()
for range ticker.C {
mgr.checkConnection(mgr.primary, "primary")
for i, replica := range mgr.replicas {
mgr.checkConnection(replica, fmt.Sprintf("replica-%d", i))
}
}
}
func (mgr *DBManager) checkConnection(db *sqlx.DB, name string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
log.Printf("Database %s health check failed: %v", name, err)
metrics.DBHealthStatus.WithLabelValues(name).Set(0)
} else {
metrics.DBHealthStatus.WithLabelValues(name).Set(1)
}
}
// 使用sqlx的高效查询示例
func (mgr *DBManager) GetUserFeed(ctx context.Context, userID string, limit int) ([]FeedItem, error) {
// 使用只读副本
db := mgr.GetReplica()
var items []FeedItem
query := `
SELECT id, user_id, content, created_at, updated_at
FROM feed_items
WHERE user_id = $1 AND deleted_at IS NULL
ORDER BY created_at DESC
LIMIT $2
`
// 使用sqlx的结构体扫描
err := db.SelectContext(ctx, &items, query, userID, limit)
if err != nil {
return nil, fmt.Errorf("failed to get user feed: %w", err)
}
return items, nil
}
// 批量插入优化
func (mgr *DBManager) BatchInsertActivities(ctx context.Context, activities []Activity) error {
// 使用事务和批量插入
tx, err := mgr.primary.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 使用COPY命令进行高效批量插入(PostgreSQL特定)
stmt, err := tx.PrepareContext(ctx,
`COPY activities (actor, verb, object, target, created_at) FROM STDIN`)
if err != nil {
return err
}
defer stmt.Close()
for _, activity := range activities {
_, err := stmt.ExecContext(ctx,
activity.Actor,
activity.Verb,
activity.Object,
activity.Target,
activity.CreatedAt,
)
if err != nil {
return err
}
}
return tx.Commit()
}
这个职位对Go开发者的要求非常全面,从语言特性掌握到底层系统优化,再到分布式系统设计都需要有深入的理解。上述代码示例展示了处理高并发、低延迟、分布式数据等挑战时可能采用的技术方案。

