Események a Spring Modulith-ban
Következő videóm témája, hogy egy modularizált alkalmazás felépítésekor az eseménykezelés segít nekünk abban, hogy a modulok lazán kapcsolódjanak egymáshoz. A Spring Modulith ezt kiegészíti, biztosabbá teszi a hiba- és tranzakciókezelést, valamint képes ezeket az eseményeket más service-ek felé is elküldeni valamilyen message brokeren keresztül.
Lesz szó modularizált alkalmazásról, eseménykezelésről, Spring Modulithról, tranzakciókezelésről, Testcontainersről, Kafkáról.
A példa alkalmazás forráskódja megtalálható a GitHubon.
A Spring Modulit modulkezeléséről írtam korábban egy posztot Modularizált alkalmazás fejlesztése a Spring Modulith-tal címmel.
A poszt végén a videóhoz képest extra tartalom is van.
Modulok közötti kommunikációra eseményeket használva laza lesz közöttük a kapcsolat, feloldhatóak a körkörös függőségek, sőt a tesztelés is egyszerűbbé válik. A Spring Framework alapból tartalmaz eseményküldést és fogadást, a Spring Modulith ezt kiegészíti.
Adott egy több modulból álló alkalmazás. Az Employees
modul az alkalmazottakat,
a Skills
a képzettségeket, és az AcquiredSkills
pedig az alkalmazottakhoz kapcsolódó
képzettségeket kezeli.
Az Employees
modul az alkalmazott törlésekor
dob egy eseményt az EmployeeService
deleteEmployee()
metódusában,
az injektált ApplicationEventPublisher
.
@Transactional
public void deleteEmployee(long id) {
Employee employee = employeeRepository.findByIdWithAddresses(id)
.orElseThrow();
employeeRepository.delete(employee);
publisher.publishEvent(new EmployeeHasBeenDeletedEvent(id));
}
Az eseményt a képzettségeket kezelő modul fogadja
az AcquiredSkillsService
handleEmployeeHasBeenDeletedEvent()
metódusában, és törli az
alkalmazotthoz kapcsolódó képzettségeket.
@EventListener
public void handleEmployeeHasBeenDeletedEvent(EmployeeHasBeenDeletedEvent event) {
log.info("Event has arrived: {}", event);
var employeeSkills = employeeSkillsRepository.findByEmployeeId(event.employeeId());
employeeSkills.ifPresent(skills -> employeeSkillsRepository.delete(skills));
}
Tranzakciókezelés
Ez alapértelmezetten szinkron módon fut, abban a tranzakcióban, amelyben az eseményt eldobó metódus is fut.
Ezért érdemes rátenni az @Async
annotációt, mely külön szálon futtatja, valamint
a TransactionalEventListener
annotációt, mely azt biztosítja, hogy az előző tranzakció
commitja után fusson le, valamint a @Transactional(propagation = Propagation.REQUIRES_NEW)
annotációt, ami azért felelős, hogy saját tranzakciót indítson.
@Async
@TransactionalEventListener
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleEmployeeHasBeenDeletedEvent(EmployeeHasBeenDeletedEvent event) {
// ...
}
Ehelyett felveszek egy új függőséget:
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId>
</dependency>
És a három annotáció
helyett használhatom a Spring Modulith @ApplicationModuleListener
annotációját,
mely önmaga tartalmazza mindhárom másik annotációt.
@ApplicationModuleListener
public void handleEmployeeHasBeenDeletedEvent(EmployeeHasBeenDeletedEvent event) {
// ...
}
Event Publication Repository
Ekkor még mindig megtörténhet az, hogy az esemény feldolgozása közben hiba lép fel, és az esemény elveszik.
Ennek kivédésére az Event Publication Repository az eseményt kiírja adatbázisba is. Ehhez használható a következő függőség.
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-starter-jpa</artifactId>
</dependency>
Ha most létrehozunk, majd törlünk egy felhasználót, így küldünk eseményt,
akkor látható, hogy bekerül az event_publication
táblába is JSON formátumban,
kitöltött completition_date
mezővel.
Az alkalmazásba vegyük fel a Spring Boot Chaos Monkey projektet, mely segítségével
hibát lehet tenni a rendszerbe. Az Actuatorjának használatával beállítható, hogy
az AcquiredSkillsService
handleEmployeeHasDeletedEvent()
metódusa dobjon RuntimeException
-t.
Ekkor a táblába az esemény üres completition_date
értékkel kerül be.
Ezeket az alkalmazás indulásakor próbálja újra elküldeni, ha be van állítva a spring.modulith.republish-outstanding-events-on-restart
property.
Extra tartalom: Saját mechanizmus
A régi eseményeket bizonyos időközönként törölni érdemes, erre használható
egy injektált CompletedEventPublications
példány.
A nem feldolgozott eseményeket programozottan is újraküldhetjük,
ehhez egy IncompleteEventPublications
példányt kell injektálni, aminek pl.
ütemezve meghívható a resubmitIncompletePublicationsOlderThan
metódusa.
@Service
@AllArgsConstructor
@Slf4j
public class EventsService {
private final CompletedEventPublications completedEventPublications;
private final IncompleteEventPublications incompleteEventPublications;
@Scheduled(fixedRate = 5000)
public void logCompletedEventPublications() {
completedEventPublications.findAll().stream().forEach(completedEvent -> log.debug("Completed event: {}", completedEvent));
}
@Scheduled(fixedRate = 5000)
public void retryIncompleteEventPublications() {
incompleteEventPublications.resubmitIncompletePublicationsOlderThan(Duration.ZERO);
}
}
Esemény küldése message brokeren
Erre az eseményre más service-ek is kíváncsiak lehetnek, ezért egy Kafka topicba is el lehet küldeni. Ehhez fel kell venni a következő függőséget.
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-kafka</artifactId>
</dependency>
Valamint az eseményre rátenni a @Externalized
annotációt.
Ekkor a Kafkában, létrejön egy
EmployeeHasBeenDeletedEvent
topic, és az alkalmazott törlésekor megjelenik benne egy üzenet.