Application réactive over HTTP

Ici, c'est pour parler d'une problématique que ne couvre pas les applications évoquant la programmation réactive: à savoir la partie où les données qui doivent transiter sur le protocole HTTP.

En effet, je vois beaucoup d'articles qui disent que si nous utilisons Spring Flux côté backend et Angular côté frontend (par exemple), nous avons alors une application qui est totalement réactive.

Sur le papier, c'est vrai. Dans la réalité, c'est une autre histoire. Et pour cela, nous allons nous faire une mise en situation.

Création de l'application

Créons une application Spring Flux. Pour ne pas s'embêter, nous allons utiliser cet excellent article https://www.baeldung.com/spring-webflux et récupérer le code source sur https://github.com/eugenp/tutorials.

Faisons un "mvn clean install" et dirigeons nous vers le répertoire "spring-5-reactive-security". Dedans nous allons trouver un contrôlleur REST basé sur Spring Flux, qui va nous retourner une liste d'employés:

package com.baeldung.webflux;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/employees")
public class EmployeeController {

    private EmployeeRepository employeeRepository;

    public EmployeeController(EmployeeRepository employeeRepository) {
        this.employeeRepository = employeeRepository;
    }

    @GetMapping("/{id}")
    private Mono<Employee> getEmployeeById(@PathVariable String id) {
        return employeeRepository.findEmployeeById(id);
    }

    @GetMapping
    private Flux<Employee> getAllEmployees() {
        return employeeRepository.findAllEmployees();
    }

    @PostMapping("/update")
    private Mono<Employee> updateEmployee(@RequestBody Employee employee) {
        return employeeRepository.updateEmployee(employee);
    }

}

Si vous lancer la commande "mvn spring-boot:run", le serveur va se lancer et vous pourrez voir la liste d'employés sur le lien http://localhost:8080/employees.

Le principe de Spring Flux est de pouvoir faire de la programmation réactive, et à ce titre de remonter les données au fur et à mesure qu'elles arrivent. Pour simuler une latence sur la remontée de données, nous allons éditer le fichier "src/main/java/com/baeldung/webflux/EmployeeRepository.java" et modifier le code suivant:

public Flux<Employee> findAllEmployees()
{
    return Flux.fromIterable(employeeData.values());
}

Par:

public Flux<Employee> findAllEmployees()
{
    return Flux.fromIterable(employeeData.values())
        .delayElements(java.util.Duration.ofSeconds(2));
}


Maintenant créons une application Angular. Voici les commandes de bases:

> npm install --global @angular/cli
> ng new reactive-over-http-example
> cd reactive-over-http-example

Nous allons créer un composant basique chargeant les employées:

import {Component} from '@angular/core';
import {Employee, EmployeeService} from '../service/employee.service';

@Component({
  selector: 'app-step1',
  templateUrl: './step1.component.html',
  styleUrls: ['./step1.component.css']
})
export class Step1Component {
  employees: Employee[] = [];
  isError = false;

  constructor(
    private employeeService: EmployeeService
  ) { }

  load() {
    this.employeeService
      .getEmployees()
      .subscribe(
        (employees) => this.employees = employees,
        (err) => {
          console.error(err);
          this.isError = true;
        }
      );
  }

}

Avec le code HTML suivant:

<h1>Classical workflow</h1>
<p>
  <button (click)="load()">Click to load</button>
</p>
<hr />
<p *ngIf="isError">
  <b>An error occured</b>
</p>
<ul *ngIf="!isError">
  <li *ngFor="let employee of employees">
    ({{ employee.id }}) {{ employee.name }}
  </li>
</ul>


Faisons ensuite le service suivant:

import {Injectable} from '@angular/core';
import {HttpClient} from '@angular/common/http';
import {Observable} from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class EmployeeService {

  constructor(
    private httpClient: HttpClient
  ) { }

  getEmployees(): Observable<Employee[]> {
    return this.httpClient.get<Employee[]>('/employees');
  }
}

export interface Employee {
  id: string;
  name: string;
}

Constatations

Et lançons alors l'application nous avons le comportement suivant:


Basiquement que constatons-nous ?  Tout simplement que nous perdons la notion de réactivité car tout simplement l'appel l'observable générée à travers une requête Ajax va attendre la fin de la réponse HTTP avant de continuer.

En soit, c'est un peu dommage, car par exemple pour des raisons de "latences", nous pourrions imaginer de transmettre les données au fur et à mesure que nous les recevons.

"application/stream+json"

Rien n'est perdu, car nous pouvons utiliser Spring Flux en indiquant que nous voulons une réponse avec un content type à "application/stream+json". Le principe est simple: dès que possible nous essayons d'envoyer une ligne contenant un JSON à analyser.

Le hic de cela, c'est qu'entre deux applications Spring Flux, cela fonctionne naturellement. Entre un serveur Spring Flux et un navigateur, ce n'est pas la même chose.

Essayons toutefois de spécifier ce format dans notre service:

import {Injectable} from '@angular/core';
import {HttpClient} from '@angular/common/http';
import {Observable} from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class EmployeeService {

  constructor(
    private httpClient: HttpClient
  ) { }

  getEmployees(): Observable<Employee[]> {
    return this.httpClient.get<Employee[]>('/employees');
  }

  getStreamedEmployees(): Observable<Employee[]> {
    return this.httpClient.get<Employee[]>(
      '/employees',
      {
        headers: {
          'Accept': 'application/stream+json'
        }
      }
    );
  }
}

export interface Employee {
  id: string;
  name: string;
}


Quand nous l'utilisons, voici ce qui se passe:


Dommage ... Mais prévisible: l'observable va toujours attendre la fin de la requête HTTP et va malheureusement planter car le contenu ne ressemble pas à un JSON classique.

Du coup, nous sommes coincés ?

XHR2 à la rescousse

Il existe un version 2 de XHR permettant d'avoir des features plus avancées que d'habitude. Notamment une notion de "progress event" qui permettent d'être prévenu à chaque fois que nous recevons des données. Cela est notamment utile pour faire le suivi d'un upload / download de fichier.

Et cela est plutôt bien supporté: https://caniuse.com/#feat=xhr2


Maintenant la question que nous pouvons nous dire est: comment l'implémenter ? En VanillaJS, ce n'est pas simple, tout comme en Angular.

Il existe néanmoins une librairie très sympa (et simple à utiliser), oboe.js qui permet d'utiliser cela, aussi bien dans NodeJs que dans son navigateur: http://oboejs.com/



Mise en application

Modifions notre service afin d'utiliser Oboe.js:

import { Injectable } from '@angular/core';
import {HttpClient} from '@angular/common/http';
import {BehaviorSubject, Observable} from 'rxjs';
import * as oboe from 'oboe/dist/oboe-browser';
import {filter} from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class EmployeeService {

  constructor(
    private httpClient: HttpClient
  ) { }

  getEmployees(): Observable<Employee[]> {
    return this.httpClient.get<Employee[]>('/employees');
  }

  getStreamedEmployees(): Observable<Employee[]> {
    return this.httpClient.get<Employee[]>(
      '/employees',
      {
        headers: {
          'Accept': 'application/stream+json'
        }
      }
    );
  }

  getRealStreamedEmployees(): Observable<Employee> {
    const employees$ = new BehaviorSubject(null);
    const oboeInstance = oboe({
      url: '/employees',
      headers: {
        'Accept': 'application/stream+json'
      }
    });

    oboeInstance
      .done((employee) => employees$.next(employee))
      .fail((err) => employees$.error(err))
      .addListener('end', () => employees$.complete());

    return employees$.pipe(
      filter((employee) => !!employee)
    );
  }
}

export interface Employee {
  id: string;
  name: string;
}


Notons plusieurs choses:
  1. Nous allons utiliser un BehaviorSubject afin de créer un Observable qui recevra au fur et à mesure une nouvelle entrée
  2. Le service retourne un observable de type Employee, et plus un tableau d'Employee
  3. Comme nous devons définir une valeur par défaut sur le BehaviorSubject , nous mettons null mais du coup, nous faisons en sorte de ne pas remonter cette valeur en filtrant les valeurs à null.
  4. Nous instancions Obeo en précisant le fameux "application/stream+json"
  5. "done" permettant d'être appelé à chaque fois que nous recevons du nouveau contenu JSON (nous pourrions faire du filtrage, mais ici, ce n'est pas notre but)
  6. "fail" est utiliser en cas de soucis et du coup nous transmettons l'information si nécessaire à l'observable
  7. "addListener" permet de détecter ici la fermeture de l'appel HTTP. Et ainsi, nous fermons notre observable car nous savons que plus aucune donnée ne sera remontée.
Vu que nous avons des données qui sont remontées au fur et à mesure, nous devons modifier légèrement notre composant:


import {Component} from '@angular/core';
import {Employee, EmployeeService} from '../service/employee.service';

@Component({
  selector: 'app-step3',
  templateUrl: './step3.component.html',
  styleUrls: ['./step3.component.css']
})
export class Step3Component {
  employees: Employee[] = [];
  isError = false;

  constructor(
    private employeeService: EmployeeService
  ) { }

  load() {
    this.employeeService
      .getRealStreamedEmployees()
      .subscribe(
        (employee) => {
          this.employees.push(employee);
        },
        (err) => {
          console.error(err);
          this.isError = true;
        }
      );
  }
}


Et voilà ce que cela donne:



Ici, nous avons une approche full réactive, où nous affichons les données au fur et à mesure qu'elles arrivent, ce qui est plutôt pratique.

Commentaires

Posts les plus consultés de ce blog

ISO: liens & outils utiles

NodeJs et SSL: une petite analyse

Créer sa commande slack en quelques minutes