O objetivo deste post, é apresentar uma forma de entregar dados em tempo real através da web com uma técnica utilizada pelos grandes players como Google e Twitter: O Streaming de Dados.
O streaming de dados é muito útil quando você precisa entregar grandes quantidades de dados onde provavelmente seria utilizada uma técnica de paginação no consumo de apis, ou mesmo quando é necessário entregar conteúdo em tempo real a partir de eventos.
No .NET Framework, era possível fazer isso com o auxílio da classe PushStreamContent que retornava os dados em HttpResponseMessage. Entretanto no .NET Core deixamos de utilizar o HttpResponseMessage para utilizar a interface IActionResult em nossos retornos de API, sendo assim, pretendo mostrar uma forma de criar uma api de streaming em .NET Core utilizando este novo modelo.
Criando o projeto
Primeiro vamos criar o projeto, para isso basta no Visual Studio clicar em File>New>Project e criar um ASP.NET Core Web Application
Após clicar em OK, irá perguntar sobre o template do projeto, selecione empty e clique em OK.
Apenas um detalhe, é que o Suporte a Docker não é obrigatório, você pode seguir este tutorial sem docker normalmente.
Feito isso, o Visual Studio criará a seguinte estrutura:
Até aqui tudo bem! Vamos codar! \o/
Codando \o/
Dentro do projeto, crie uma pasta Controllers e dentro desta pasta crie uma controller chamada Cliente.
Vamos criar uma pasta chamada Results e dentro dela criaremos uma classe com o nome de PushStreamResult, que será responsável por obter o stream, setar o content type e retornar para um callback onde vamos identificar quem receberá os conteúdos. Sua estrutura deve estar assim:
Beleza! Agora vamos colocar código na classe PushStreamResult, sua classe deve ficar mais ou menos assim:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.AspNetCore.Http; | |
using Microsoft.AspNetCore.Mvc; | |
using Microsoft.Net.Http.Headers; | |
using System; | |
using System.IO; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace DotNetCoreStreaming.Results | |
{ | |
public class PushStreamResult : IActionResult | |
{ | |
private readonly Action<Stream, CancellationToken> _onStreamAvailable; | |
private readonly string _contentType; | |
private readonly CancellationToken _requestAborted; | |
public PushStreamResult(Action<Stream, CancellationToken> onStreamAvailable, string contentType, CancellationToken requestAborted) | |
{ | |
_onStreamAvailable = onStreamAvailable; | |
_contentType = contentType; | |
_requestAborted = requestAborted; | |
} | |
public Task ExecuteResultAsync(ActionContext context) | |
{ | |
var stream = context.HttpContext.Response.Body; | |
context.HttpContext.Response.GetTypedHeaders().ContentType = new MediaTypeHeaderValue(_contentType); | |
_onStreamAvailable(stream, _requestAborted); | |
return Task.CompletedTask; | |
} | |
} | |
} |
Observe que ela herda de IActionResult para podermos retornar em nossa rota da controller e no método ExecuteResultAsync nós apenas pegamos o body e o cancellationToken para passar ao callback _onStreamAvailable e setamos o ContentType que foi passado no construtor da classe.
Agora precisamos modificar nossa classe de startup, para que ela reconheça a estrutura do MVC:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.AspNetCore.Builder; | |
using Microsoft.AspNetCore.Hosting; | |
using Microsoft.AspNetCore.Mvc; | |
using Microsoft.Extensions.DependencyInjection; | |
namespace DotNetCoreStreaming | |
{ | |
public class Startup | |
{ | |
public void ConfigureServices(IServiceCollection services) | |
{ | |
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); | |
} | |
public void Configure(IApplicationBuilder app, IHostingEnvironment env) | |
{ | |
if (env.IsDevelopment()) | |
{ | |
app.UseDeveloperExceptionPage(); | |
} | |
else | |
{ | |
app.UseHsts(); | |
} | |
app.UseHttpsRedirection(); | |
app.UseMvc(); | |
} | |
} | |
} |
Antes de começar a mexer na controller de cliente para simular os eventos de update e insert, eu criei um enum de eventos e uma classe de cliente bem simples conforme abaixo:
Enum de eventos:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace DotNetCoreStreaming.Enums | |
{ | |
public enum EventoEnum | |
{ | |
Insert = 1, | |
Update = 2 | |
} | |
} |
Modelo de cliente:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace DotNetCoreStreaming.Models | |
{ | |
public class Cliente | |
{ | |
public long Id { get; set; } | |
public string Nome { get; set; } | |
} | |
} |
Feito isso, vamos à controller de cliente:
Na controller de cliente, precisamos criar uma ConcurrentBag estática que será onde vamos armazenar os clients do nosso streaming, ou seja, quem irá consumir os dados:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static ConcurrentBag<StreamWriter> _clients; | |
static ClienteController() | |
{ | |
_clients = new ConcurrentBag<StreamWriter>(); | |
} |
Perceba que já inicializamos no construtor da controller para que possamos ir adicionando os clients conforme forem chegando 😉
Feito isso, vamos definir os métodos de insert e update da controller, veja que está bem simples para que possamos focar na funcionalidade do streaming:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[HttpPost] | |
public IActionResult Post(Cliente cliente) | |
{ | |
//Fazer o Insert | |
EnviarEvento(cliente, EventoEnum.Insert); | |
return Ok(); | |
} | |
[HttpPut] | |
public IActionResult Put(Cliente cliente) | |
{ | |
//Fazer o Update | |
EnviarEvento(cliente, EventoEnum.Update); | |
return Ok(); | |
} | |
private static async Task EnviarEvento(object dados, EventoEnum evento) | |
{ | |
foreach (var client in _clients) | |
{ | |
string jsonEvento = string.Format("{0}\n", JsonConvert.SerializeObject(new { dados, evento })); | |
await client.WriteAsync(jsonEvento); | |
await client.FlushAsync(); | |
} | |
} |
Aqui também já adicionamos o método que irá escrever o json no streaming, ele recebe o cliente na variável dados e o enum de evento, para que possamos passar tudo isso à requisição http do streaming.
Por último, vamos adicionar a rota para se inscrever no streaming e o callback do PushStreamResult que adicionará os clients em nossa ConcurrentBag e irá tratar caso algum dos clients feche a sessão.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[HttpGet] | |
[Route("Streaming")] | |
public IActionResult Stream() | |
{ | |
return new PushStreamResult(OnStreamAvailable, "text/event-stream", HttpContext.RequestAborted); | |
} | |
private void OnStreamAvailable(Stream stream, CancellationToken requestAborted) | |
{ | |
var wait = requestAborted.WaitHandle; | |
var client = new StreamWriter(stream); | |
_clients.Add(client); | |
wait.WaitOne(); | |
StreamWriter ignore; | |
_clients.TryTake(out ignore); | |
} |
Aqui criamos uma action para recebermos os dados do streaming utilizando a classe PushStreamResult e adicionamos o método OnStreamAvailable que será responsável por tratar os clients que quiserem receber os dados do streaming.
A controller de cliente deve ficar assim:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using DotNetCoreStreaming.Enums; | |
using DotNetCoreStreaming.Models; | |
using DotNetCoreStreaming.Results; | |
using Microsoft.AspNetCore.Http; | |
using Microsoft.AspNetCore.Mvc; | |
using Newtonsoft.Json; | |
using System.Collections.Concurrent; | |
using System.IO; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace DotNetCoreStreaming.Controllers | |
{ | |
[Route("api/[controller]")] | |
[ApiController] | |
public class ClienteController : ControllerBase | |
{ | |
private static ConcurrentBag<StreamWriter> _clients; | |
static ClienteController() | |
{ | |
_clients = new ConcurrentBag<StreamWriter>(); | |
} | |
[HttpPost] | |
public IActionResult Post(Cliente cliente) | |
{ | |
//Fazer o Insert | |
EnviarEvento(cliente, EventoEnum.Insert); | |
return Ok(); | |
} | |
[HttpPut] | |
public IActionResult Put(Cliente cliente) | |
{ | |
//Fazer o Update | |
EnviarEvento(cliente, EventoEnum.Update); | |
return Ok(); | |
} | |
private static async Task EnviarEvento(object dados, EventoEnum evento) | |
{ | |
foreach (var client in _clients) | |
{ | |
string jsonEvento = string.Format("{0}\n", JsonConvert.SerializeObject(new { dados, evento })); | |
await client.WriteAsync(jsonEvento); | |
await client.FlushAsync(); | |
} | |
} | |
[HttpGet] | |
[Route("Streaming")] | |
public IActionResult Stream() | |
{ | |
return new PushStreamResult(OnStreamAvailable, "text/event-stream", HttpContext.RequestAborted); | |
} | |
private void OnStreamAvailable(Stream stream, CancellationToken requestAborted) | |
{ | |
var wait = requestAborted.WaitHandle; | |
var client = new StreamWriter(stream); | |
_clients.Add(client); | |
wait.WaitOne(); | |
StreamWriter ignore; | |
_clients.TryTake(out ignore); | |
} | |
} | |
} |
Funciona??
Para testar, criei no postmam um json de cliente, para ver se o streaming está funcionando basta acessar pelo browser a rota https://localhost:44385/api/cliente/streaming (troque a porta pela sua).
Verifique que quando você colocar essa url no browser, ele ficará carregando e não fará nada além disso:
Ainda com o browser aberto, experimente fazer uma requisição para nossas API’s de Insert ou Update:
Veja o que aparece no browser:
Repare que os dados são escritos em tempo real no browser e que podemos utilizar essa arquitetura de diversas formas, até mesmo para aplicativos mobile realtime, mas isso é assunto para outro post…
O projeto está no meu github em https://github.com/sergioprates/DotNetCoreStreaming
(Cross-post de https://medium.com/@sergioprates/criando-uma-api-streaming-com-net-core-b2eeaab0dfac)
Sergio Prates