Skip to content
Go back

在 ASP.NET Core 中构建异步 API 的正确方法

Published:  at  12:00 AM

在 ASP.NET Core 中构建异步 API 的正确方法

理解异步 API 的本质:从操作级别的思维转变

大多数开发者提到”异步 API”时,往往想到的是代码层面的 async/await 关键字。然而,真正的异步 API 设计是一种架构层面的思维转变,它将长耗时操作从用户感知的”等待”中彻底解耦出来。

传统的同步 API 遵循”请求-处理-响应”的线性模式,这种模式在快速操作(如数据查询、简单更新)中表现良好。但当面对图像处理、报表生成、视频转码等可能耗时数分钟甚至数小时的操作时,这种模式就会暴露出严重的问题:客户端超时、服务器资源浪费、用户体验糟糕。

异步 API 的核心思想是将工作分解为两个独立的阶段:

第一阶段是”接收与确认”,服务器立即接收请求,进行基本验证,并返回一个追踪标识符,整个过程通常在几毫秒内完成。第二阶段是”后台处理”,真正的业务逻辑在后台队列中异步执行,客户端可以通过状态端点查询进度,或者通过实时推送接收更新通知。

这种模式不仅仅是技术实现的改进,更是对用户体验的根本性提升。用户不再需要盯着加载动画等待,而是可以继续执行其他操作,系统会在任务完成时主动通知结果。

同步 API 的性能瓶颈与架构缺陷

让我们通过一个具体的图像处理场景来分析同步 API 的问题。典型的同步实现如下:

[HttpPost]
public async Task<IActionResult> UploadImage(IFormFile file)
{
    if (file is null)
    {
        return BadRequest();
    }

    // 保存原始图像,可能耗时 1-3 秒
    var originalPath = await SaveOriginalAsync(file);

    // 生成多种尺寸的缩略图,可能耗时 5-15 秒
    var thumbnails = await GenerateThumbnailsAsync(originalPath);

    // 图像压缩优化,可能耗时 3-10 秒
    await OptimizeImagesAsync(originalPath, thumbnails);

    return Ok(new { originalPath, thumbnails });
}

在这个实现中,整个 HTTP 请求的生命周期可能长达 30 秒甚至更久。这种设计存在多个严重问题:

首先是资源利用率低下。每个请求都会占用一个线程池线程,即使该线程大部分时间都在等待 I/O 操作完成。当并发请求增多时,线程池很快会耗尽,导致新请求排队等待。

其次是用户体验糟糕。客户端必须保持连接并等待整个处理过程完成,在网络不稳定或文件较大的情况下,用户可能会看到长时间的加载状态,甚至遭遇超时错误。

再者是容错能力差。如果处理过程中某个步骤失败(比如缩略图生成出错),整个请求都会失败,之前已完成的工作(如原始文件保存)也可能需要回滚。

最后是可扩展性限制。这种同步模式使得水平扩展变得困难,因为每个实例都必须具备完整的图像处理能力,无法针对不同的处理步骤进行专门的资源优化。

异步处理架构:分离关注点的优雅解决方案

正确的异步 API 设计需要从架构层面重新思考请求处理流程。我们需要将”接收请求”和”执行处理”完全分离,建立一个基于消息队列的异步处理管道。

请求接收阶段的优化实现

改进后的上传端点专注于快速响应和任务调度:

[HttpPost]
public async Task<IActionResult> UploadImage(IFormFile? file)
{
    // 快速验证,避免不必要的处理
    if (file is null)
    {
        return BadRequest("No file uploaded.");
    }

    if (!imageService.IsValidImage(file))
    {
        return BadRequest("Invalid image file.");
    }

    // 生成唯一任务标识
    var taskId = Guid.NewGuid().ToString();
    var folderPath = Path.Combine(_uploadDirectory, "images", taskId);
    var fileName = $"{taskId}{Path.GetExtension(file.FileName)}";

    // 仅执行必要的同步操作:保存原始文件
    var originalPath = await imageService.SaveOriginalImageAsync(
        file,
        folderPath,
        fileName
    );

    // 创建后台处理任务
    var processingJob = new ImageProcessingJob
    {
        Id = taskId,
        OriginalPath = originalPath,
        OutputPath = folderPath,
        CreatedAt = DateTime.UtcNow,
        Status = JobStatus.Queued
    };

    // 将任务加入队列
    await jobQueue.EnqueueAsync(processingJob);

    // 记录初始状态
    await statusTracker.SetStatusAsync(taskId, JobStatus.Queued);

    // 构建状态查询URL
    var statusUrl = Url.Action("GetStatus", new { id = taskId });

    // 立即返回 202 Accepted 状态
    return Accepted(statusUrl, new
    {
        id = taskId,
        status = "queued",
        statusUrl = statusUrl,
        estimatedCompletionTime = DateTime.UtcNow.AddMinutes(2)
    });
}

这个改进版本的关键优势在于极大缩短了 HTTP 请求的处理时间。客户端只需要等待文件上传和基本验证完成,通常在几秒钟内就能获得响应,而不是等待整个处理流程。

状态查询端点的设计

状态查询端点需要提供丰富的信息,不仅包括当前状态,还要包括进度详情和可用的资源链接:

[HttpGet("{id}/status")]
public async Task<IActionResult> GetStatus(string id)
{
    var jobStatus = await statusTracker.GetDetailedStatusAsync(id);
    if (jobStatus == null)
    {
        return NotFound(new { message = "Job not found" });
    }

    var response = new
    {
        id = jobStatus.Id,
        status = jobStatus.Status,
        progress = jobStatus.Progress,
        createdAt = jobStatus.CreatedAt,
        updatedAt = jobStatus.UpdatedAt,
        estimatedCompletionTime = jobStatus.EstimatedCompletionTime,
        currentStep = jobStatus.CurrentStep,
        totalSteps = jobStatus.TotalSteps,
        message = jobStatus.Message,
        links = new Dictionary<string, string>()
    };

    // 根据状态提供不同的资源链接
    switch (jobStatus.Status)
    {
        case JobStatus.Completed:
            response.links["original"] = GetImageUrl(id);
            response.links["thumbnail_small"] = GetThumbnailUrl(id, 200);
            response.links["thumbnail_medium"] = GetThumbnailUrl(id, 400);
            response.links["thumbnail_large"] = GetThumbnailUrl(id, 800);
            response.links["download"] = GetDownloadUrl(id);
            break;

        case JobStatus.Failed:
            response.links["retry"] = Url.Action("RetryJob", new { id });
            break;

        case JobStatus.Processing:
            response.links["cancel"] = Url.Action("CancelJob", new { id });
            break;
    }

    return Ok(response);
}

队列系统的架构设计

对于队列系统,我们需要根据部署架构选择合适的实现。单机部署可以使用 .NET 内置的 Channel 类型,而分布式部署则需要考虑 Redis、RabbitMQ 或 Azure Service Bus 等外部消息队列。

public class ImageProcessingJobQueue
{
    private readonly Channel<ImageProcessingJob> _channel;
    private readonly ILogger<ImageProcessingJobQueue> _logger;

    public ImageProcessingJobQueue(ILogger<ImageProcessingJobQueue> logger)
    {
        _logger = logger;

        // 配置有界队列,防止内存溢出
        var options = new BoundedChannelOptions(1000)
        {
            FullMode = BoundedChannelFullMode.Wait,
            SingleReader = false,
            SingleWriter = false
        };

        _channel = Channel.CreateBounded<ImageProcessingJob>(options);
    }

    public async ValueTask<bool> EnqueueAsync(
        ImageProcessingJob job,
        CancellationToken cancellationToken = default)
    {
        try
        {
            await _channel.Writer.WriteAsync(job, cancellationToken);
            _logger.LogInformation("Job {JobId} enqueued successfully", job.Id);
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to enqueue job {JobId}", job.Id);
            return false;
        }
    }

    public IAsyncEnumerable<ImageProcessingJob> DequeueAsync(
        CancellationToken cancellationToken = default)
    {
        return _channel.Reader.ReadAllAsync(cancellationToken);
    }

    public void CompleteWriting()
    {
        _channel.Writer.Complete();
    }
}

后台处理服务的健壮实现

后台处理服务是整个异步架构的核心,它需要具备容错、监控和资源管理能力:

public class ImageProcessingBackgroundService : BackgroundService
{
    private readonly ImageProcessingJobQueue _jobQueue;
    private readonly IStatusTracker _statusTracker;
    private readonly IImageProcessor _imageProcessor;
    private readonly ILogger<ImageProcessingBackgroundService> _logger;
    private readonly SemaphoreSlim _concurrencyLimiter;

    public ImageProcessingBackgroundService(
        ImageProcessingJobQueue jobQueue,
        IStatusTracker statusTracker,
        IImageProcessor imageProcessor,
        ILogger<ImageProcessingBackgroundService> logger)
    {
        _jobQueue = jobQueue;
        _statusTracker = statusTracker;
        _imageProcessor = imageProcessor;
        _logger = logger;

        // 限制并发处理数量,避免资源过载
        _concurrencyLimiter = new SemaphoreSlim(Environment.ProcessorCount);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var job in _jobQueue.DequeueAsync(stoppingToken))
        {
            // 不等待任务完成,允许并发处理
            _ = ProcessJobAsync(job, stoppingToken);
        }
    }

    private async Task ProcessJobAsync(ImageProcessingJob job, CancellationToken cancellationToken)
    {
        await _concurrencyLimiter.WaitAsync(cancellationToken);

        try
        {
            _logger.LogInformation("Starting processing job {JobId}", job.Id);

            await _statusTracker.UpdateStatusAsync(job.Id, JobStatus.Processing,
                "开始处理图像", currentStep: 1, totalSteps: 3);

            // 步骤 1: 生成缩略图
            await _statusTracker.UpdateStatusAsync(job.Id, JobStatus.Processing,
                "正在生成缩略图", currentStep: 1, totalSteps: 3);

            await _imageProcessor.GenerateThumbnailsAsync(
                job.OriginalPath,
                job.OutputPath,
                cancellationToken);

            // 步骤 2: 图像优化
            await _statusTracker.UpdateStatusAsync(job.Id, JobStatus.Processing,
                "正在优化图像", currentStep: 2, totalSteps: 3);

            await _imageProcessor.OptimizeImagesAsync(
                job.OriginalPath,
                job.OutputPath,
                cancellationToken);

            // 步骤 3: 生成元数据
            await _statusTracker.UpdateStatusAsync(job.Id, JobStatus.Processing,
                "正在生成元数据", currentStep: 3, totalSteps: 3);

            await _imageProcessor.GenerateMetadataAsync(
                job.OriginalPath,
                job.OutputPath,
                cancellationToken);

            // 完成处理
            await _statusTracker.UpdateStatusAsync(job.Id, JobStatus.Completed,
                "处理完成", currentStep: 3, totalSteps: 3);

            _logger.LogInformation("Job {JobId} completed successfully", job.Id);
        }
        catch (OperationCanceledException)
        {
            await _statusTracker.UpdateStatusAsync(job.Id, JobStatus.Cancelled, "任务已取消");
            _logger.LogWarning("Job {JobId} was cancelled", job.Id);
        }
        catch (Exception ex)
        {
            await _statusTracker.UpdateStatusAsync(job.Id, JobStatus.Failed,
                $"处理失败: {ex.Message}");

            _logger.LogError(ex, "Job {JobId} failed with error", job.Id);

            // 可以在这里实现重试逻辑
            await ConsiderRetryAsync(job, ex);
        }
        finally
        {
            _concurrencyLimiter.Release();
        }
    }

    private async Task ConsiderRetryAsync(ImageProcessingJob job, Exception exception)
    {
        // 实现指数退避重试策略
        if (job.RetryCount < 3 && IsRetriableException(exception))
        {
            job.RetryCount++;
            var delay = TimeSpan.FromSeconds(Math.Pow(2, job.RetryCount));

            _logger.LogInformation("Scheduling retry {RetryCount} for job {JobId} after {Delay}",
                job.RetryCount, job.Id, delay);

            // 延迟重新入队
            _ = Task.Delay(delay).ContinueWith(async _ =>
            {
                await _jobQueue.EnqueueAsync(job);
            });
        }
    }

    private static bool IsRetriableException(Exception exception)
    {
        return exception is not ArgumentException &&
               exception is not FileNotFoundException;
    }
}

超越轮询:实时推送的技术实现

虽然状态查询端点提供了基本的异步体验,但频繁的轮询会带来不必要的网络开销和服务器负载。更优雅的解决方案是采用服务器推送技术,在状态变化时主动通知客户端。

SignalR 实时通信的集成

SignalR 为 ASP.NET Core 应用提供了简单而强大的实时通信能力:

public class JobStatusHub : Hub
{
    public async Task JoinJobGroup(string jobId)
    {
        await Groups.AddToGroupAsync(Context.ConnectionId, $"job_{jobId}");
    }

    public async Task LeaveJobGroup(string jobId)
    {
        await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"job_{jobId}");
    }
}

public class SignalRStatusNotifier : IStatusNotifier
{
    private readonly IHubContext<JobStatusHub> _hubContext;

    public SignalRStatusNotifier(IHubContext<JobStatusHub> hubContext)
    {
        _hubContext = hubContext;
    }

    public async Task NotifyStatusChangeAsync(string jobId, JobStatusUpdate update)
    {
        await _hubContext.Clients.Group($"job_{jobId}")
            .SendAsync("StatusUpdated", new
            {
                jobId,
                status = update.Status,
                progress = update.Progress,
                message = update.Message,
                timestamp = DateTime.UtcNow
            });
    }
}

Webhook 机制的系统集成

对于系统间通信,Webhook 提供了更为可靠的异步通知机制:

public class WebhookNotificationService : INotificationService
{
    private readonly HttpClient _httpClient;
    private readonly ILogger<WebhookNotificationService> _logger;

    public async Task SendCompletionNotificationAsync(
        string jobId,
        JobCompletionData completionData,
        string webhookUrl)
    {
        var payload = new
        {
            eventType = "job.completed",
            jobId,
            timestamp = DateTime.UtcNow,
            data = completionData
        };

        var jsonContent = JsonSerializer.Serialize(payload);
        var content = new StringContent(jsonContent, Encoding.UTF8, "application/json");

        try
        {
            var response = await _httpClient.PostAsync(webhookUrl, content);
            response.EnsureSuccessStatusCode();

            _logger.LogInformation("Webhook notification sent successfully for job {JobId}", jobId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to send webhook notification for job {JobId}", jobId);

            // 可以实现重试机制或将失败的通知存储起来稍后重试
            await ScheduleWebhookRetryAsync(jobId, payload, webhookUrl);
        }
    }
}

监控与可观测性的完整方案

一个健壮的异步 API 系统需要全面的监控和可观测性支持,以便及时发现和解决问题:

public class JobMetricsCollector
{
    private readonly IMetrics _metrics;

    public void RecordJobEnqueued(string jobType)
    {
        _metrics.CreateCounter("jobs_enqueued_total")
            .Add(1, new TagList { { "job_type", jobType } });
    }

    public void RecordJobCompleted(string jobType, TimeSpan duration)
    {
        _metrics.CreateCounter("jobs_completed_total")
            .Add(1, new TagList { { "job_type", jobType } });

        _metrics.CreateHistogram("job_duration_seconds")
            .Record(duration.TotalSeconds, new TagList { { "job_type", jobType } });
    }

    public void RecordJobFailed(string jobType, string errorType)
    {
        _metrics.CreateCounter("jobs_failed_total")
            .Add(1, new TagList
            {
                { "job_type", jobType },
                { "error_type", errorType }
            });
    }
}

异步 API 的设计不仅仅是技术实现的改进,更是对用户体验和系统架构的全面升级。通过将请求接收与后台处理分离,我们能够构建出更加响应迅速、可扩展且容错性强的现代化 Web 应用。这种架构模式特别适合处理图像处理、数据分析、报表生成等耗时操作,是构建高性能 Web 应用的重要技术手段。



Previous Post
AI Shell Preview 6:引入 MCP 支持,革命性的命令行 AI 体验
Next Post
.NET 字符串比较最佳实践:为何 string.Equals 完胜 ToLower