NestJS微服务
传统单体架构在面对高并发、快速迭代和跨团队协作时逐渐暴露出局限性,而微服务架构通过解耦服务、独立扩展和灵活部署的特性,成为现代分布式系统的核心解决方案
NestJS作为基于TypeScript的Node.js框架,凭借其模块化设计、依赖注入机制和对企业级特性的原生支持,成为构建微服务的首选工具
微服务概念
微服务是一种将复杂系统拆分为多个独立、自治、可协作的服务的架构模式。其核心概念围绕解耦和自治展开,以下是关键要点:
- 单一职责原则:每个微服务专注于一个业务领域(如用户管理、订单处理、支付系统),仅实现与其领域相关的功能,避免功能耦合;服务可独立打包、部署和扩展
- 轻量级通信:协议多样性,如HTTP/REST、gRPC(适用于需要即时响应的场景)、消息队列(如Kafka、RabbitMQ,适用于事件驱动、最终一致性);通过接口定义语言(如OpenAPI、Protobuf)明确服务间交互规则,确保跨团队协作的规范性
- 数据自治:独立数据存储每个服务拥有专属数据库(如MySQL、MongoDB),避免共享数据库导致的耦合
- 去中心化治理:不同服务可选择最适合的技术方案
典型适用场景有高并发业务(如电商秒杀、社交平台)、复杂业务系统、多团队协作项目等等
快速上手
这里带大家快速实现一个微服务案例,演示源码可以点击这里查看
项目结构
假设我们有个订单服务专门来负责订单相关的业务,然后通过主服务暴露给客户端调用,主服务再去调用订单微服务
创建主服务api-gateway
:
➜ nest n api-gateway
创建订单服务order-service
:
➜ nest n order-service
创建微服务最关键逻辑由@nestjs/microservices
这个包提供的,所以在每个项目中都需要装这个依赖
➜ npm i @nestjs/microservices
订单服务
先搭建订单微服务,微服务和平时的Nest服务入口有稍微的不同,但都差不多
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.TCP,
options: {
port: 3002,
},
},
);
app.listen();
}
bootstrap();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
看名字就可以理解了,只需要使用createMicroservice
就可以创建微服务了
这里采用TCP
协议和主应用进行通信,还有其他很多协议,我们后续讲
接着和平时Nest创建模块一样,新建一个order-status
模块用来模拟订单状态查询,然后也是建一个controller
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Controller()
export class OrderStatusController {
@MessagePattern({ cmd: 'get-order-status' })
getOrderStatus() {
console.log('getOrderStatus');
return 'OK';
}
}
2
3
4
5
6
7
8
9
10
11
因为是微服务嘛所以Controller
中路由和普通的HTTP路由(@Get/@Post..)就不太一样
微服务之间的通信可以有很多种协议(如:TCP、RPC...),和不同的通信机制如:请求响应
、事件(发布订阅)
等等
这里采用的是请求响应的方式,使用@MessagePattern({ cmd: 'get-order-status' })
标识当前请求,主应用调用时只需要和这个标识对齐即可,看后面演示
注册微服务
微服务注册后就可以在主应用中注册使用了,注册需要用到ClientsModule.register
方法
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'ORDER_SERVICE',
transport: Transport.TCP,
options: { port: 3002 },
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
直接在AppModule
中imports
种注册,需要注意一下几点:
- transport必须和指定的微服务对应
- options必须和指定微服务匹配上,如port、host等等
调用微服务
微服务注册好后就可以使用了,首先需要再Controller中注入Inject
待使用的微服务,注意和注册的微服务name
要对应
import { Controller, Get, Inject } from '@nestjs/common';
import { AppService } from './app.service';
import { ClientProxy } from '@nestjs/microservices';
import { Observable } from 'rxjs';
@Controller()
export class AppController {
constructor(
private readonly appService: AppService,
@Inject('ORDER_SERVICE') private readonly orderService: ClientProxy,
) {}
}
2
3
4
5
6
7
8
9
10
11
12
定义HTTP路由后直接通过注册的微服务调用对应名字的方法即可
@Get('/api/order-status')
getOrderStatus(): Observable<string> {
return this.orderService.send({ cmd: 'get-order-status' }, {});
}
2
3
4
这里采用send
请求响应方式调用{ cmd: 'get-order-status' }
就可以了
启动所有应用服务,浏览器打开http://localhost:3000/api/order-status
就可以看到结果了
微服务使用就是这么简单,重要的是项目的架构划分
微服务通信模式
NestJS支持多种形式的微服务通信,如常见的TCP
、gRPC
、Kafka
等等,每种都有它的优势,工作中需要根据实际情况使用
下面简单介绍几个常见的方式
TCP
- 创建微服务服务端
创建微服务应用入口
// src/main.ts
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
// 创建微服务实例
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
retryAttempts: 5, // 重试次数
retryDelay: 3000 // 重试间隔(ms)
}
}
);
await app.listen();
console.log(`Microservice is listening on TCP port 3001`);
}
bootstrap();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
创建微服务逻辑调用方法
// src/app.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
@Controller()
export class AppController {
// 处理 sum 请求
@MessagePattern('sum')
sum(@Payload() data: number[]): number {
console.log(`Received numbers: ${data}`);
return (data || []).reduce((a, b) => a + b);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
- 客户端注册TCP微服务
// client/src/app.module.ts
import { Module } from '@nestjs/common';
import { AppService } from './app.service';
@Module({
providers: [
AppService,
{
provide: 'MATH_SERVICE',
// 异步注册方式
useFactory: () => {
return ClientProxyFactory.create({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
}
});
}
}
]
})
export class AppModule {}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- 客户端调用微服务
// client/src/app.service.ts
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { timeout } from 'rxjs/operators';
@Injectable()
export class AppService {
constructor(
@Inject('MATH_SERVICE') private readonly client: ClientProxy
) {}
@Post("/xxx")
async executeExamples() {
// 发送 sum 请求
const sumResult = await this.client
.send<number, number[]>('sum', [1, 2, 3, 4, 5])
.pipe(timeout(5000))
.toPromise();
console.log(`Sum result: ${sumResult}`);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
gRPC
gRPC是一个现代、开源、高性能的RPC框架,可以在任何环境中运行。它可以通过可插拔的负载平衡、跟踪、健康检查和身份验证支持,有效地连接数据中心内和数据中心之间的服务
gRPC基于根据可以远程调用的函数(方法)定义服务的概念。对于每种方法,您需要定义参数和返回类型。服务、参数和返回类型在.proto
文件中使用Google的开源语言中立协议缓冲区机制进行定义
- 安装
➜ npm i --save @grpc/grpc-js @grpc/proto-loader
- 定义 Proto 文件
// hero/hero.proto
syntax = "proto3";
package hero;
service HeroesService {
rpc FindOne (HeroById) returns (Hero) {}
}
message HeroById {
int32 id = 1;
}
message Hero {
int32 id = 1;
string name = 2;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- 创建微服务应用入口
// main.ts
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.GRPC,
name: 'HERO_PACKAGE',
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
url: '0.0.0.0:50051'
}
});
await app.listen();
}
2
3
4
5
6
7
8
9
10
11
12
13
- 定义微服务逻辑
@Controller()
export class HeroesController {
@GrpcMethod('HeroesService', 'FindOne')
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
return items.find(({ id }) => id === data.id);
}
}
2
3
4
5
6
7
8
9
10
11
- 客户端注册GRPC
@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_PACKAGE',
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, 'hero/hero.proto'),
},
},
]),
]
})
export class AppModule {}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 客户端接口调用
@Injectable()
export class AppService implements OnModuleInit {
private heroesService: HeroesService;
constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {}
onModuleInit() {
this.heroesService = this.client.getService<HeroesService>('HeroesService');
}
getHero(): Observable<string> {
return this.heroesService.findOne({ id: 1 });
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
MQTT
MQTT(消息队列遥测传输)是一种开源、轻量级的消息传递协议,针对低延迟进行了优化。该协议提供了一种可扩展且经济高效的方式,使用发布/订阅模型连接设备。基于MQTT构建的通信系统由发布服务器、代理和一个或多个客户端组成。它专为受限设备和低带宽、高延迟或不可靠的网络而设计
- 安装
➜ npm i --save mqtt
- 创建微服务
// mqtt-server/src/main.ts
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
clientId: 'nest-server-01',
clean: true,
reconnectPeriod: 5000,
will: {
topic: 'server/status',
payload: 'server_offline',
qos: 1,
retain: true
}
}
}
);
await app.listen();
}
bootstrap();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- 微服务消息处理实现
// mqtt-server/src/app.controller.ts
import { Controller } from '@nestjs/common';
import { EventPattern, MqttContext } from '@nestjs/microservices';
@Controller()
export class AppController {
// 处理温度数据
@EventPattern('sensors/+/temperature')
handleTemperature(
@Payload() data: Buffer,
@Ctx() context: MqttContext
) {
const topic = context.getTopic(); // 获取完整主题
const deviceId = topic.split('/')[1];
console.log(`[${deviceId}] 温度数据: ${data.toString()}`);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- 客户端注册
// mqtt-client/src/mqtt-client.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'MQTT_SERVICE',
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
clientId: 'nest-client-01',
username: 'admin', // 认证信息
password: 'secret',
clean: false, // 持久会话
}
}
])
],
exports: [ClientsModule]
})
export class MqttClientModule {}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- 客户端调用
// mqtt-client/src/mqtt-client.service.ts
import { Injectable, Inject } from '@nestjs/common';
import { ClientMqtt } from '@nestjs/microservices';
import { interval } from 'rxjs';
@Injectable()
export class MqttClientService {
constructor(
@Inject('MQTT_SERVICE') private readonly client: ClientMqtt
) {}
// 启动定时数据上报
startTelemetry(deviceId: string, intervalMs: number = 5000) {
return interval(intervalMs).subscribe(() => {
const temperature = 25 + Math.random() * 5;
this.client.emit(
`sensors/${deviceId}/temperature`,
temperature.toFixed(2)
);
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
通信模式对比
通信方式 | 协议 | 适用场景 | 性能指标(QPS) | 数据格式 |
---|---|---|---|---|
同步HTTP | REST/HTTP2 | 简单查询、低延迟需求 | 3k-5k | JSON/XML |
异步消息 | AMQP/Kafka | 事件驱动、最终一致性 | 50k-100k | Avro/Protobuf |
RPC | gRPC | 高性能内部通信 | 100k+ | Protobuf |
服务网格 | HTTP/2+TLS | 跨语言、复杂治理 | 80k-120k | 多种格式 |