- websocket client on ESP32
- telemetry protocol builder - server-side message handling - database migrations - telemetry persistence
This commit is contained in:
@@ -16,14 +16,31 @@ dependencies {
|
||||
implementation("io.ktor:ktor-server-content-negotiation:2.3.7")
|
||||
implementation("io.ktor:ktor-serialization-kotlinx-json:2.3.7")
|
||||
implementation("io.ktor:ktor-server-websockets:2.3.7")
|
||||
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.2")
|
||||
|
||||
implementation("ch.qos.logback:logback-classic:1.4.14")
|
||||
|
||||
implementation("org.mariadb.jdbc:mariadb-java-client:3.3.3")
|
||||
implementation("com.zaxxer:HikariCP:5.1.0")
|
||||
testImplementation(kotlin("test"))
|
||||
}
|
||||
|
||||
application {
|
||||
mainClass.set("org.pavloveugene.iot.backend.ApplicationKt")
|
||||
}
|
||||
|
||||
tasks.jar {
|
||||
manifest {
|
||||
attributes["Main-Class"] = "org.pavloveugene.iot.backend.ApplicationKt"
|
||||
}
|
||||
|
||||
from({
|
||||
configurations.runtimeClasspath.get().map { zipTree(it) }
|
||||
})
|
||||
|
||||
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
|
||||
}
|
||||
|
||||
tasks.processResources {
|
||||
from("../db/migrations") {
|
||||
into("db/migration")
|
||||
}
|
||||
}
|
||||
@@ -10,15 +10,22 @@ import io.ktor.server.websocket.*
|
||||
import org.pavloveugene.iot.backend.config.AppConfig
|
||||
import org.pavloveugene.iot.backend.config.configureSerialization
|
||||
import org.pavloveugene.iot.backend.config.configureWebSockets
|
||||
import org.pavloveugene.iot.backend.db.Database
|
||||
import org.pavloveugene.iot.backend.db.Migration
|
||||
import org.pavloveugene.iot.backend.db.runMigrations
|
||||
import java.time.Duration
|
||||
import org.pavloveugene.iot.backend.routes.*
|
||||
|
||||
fun main() {
|
||||
embeddedServer(
|
||||
Database.dataSource
|
||||
|
||||
runMigrations()
|
||||
|
||||
val server = embeddedServer(
|
||||
Netty,
|
||||
port = AppConfig.serverPort,
|
||||
host = AppConfig.serverHost,
|
||||
){
|
||||
) {
|
||||
install(ContentNegotiation) {
|
||||
json()
|
||||
}
|
||||
@@ -34,7 +41,16 @@ fun main() {
|
||||
protocolRoutes()
|
||||
protocolWebSocket()
|
||||
}
|
||||
}.start(wait = true)
|
||||
}
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(Thread {
|
||||
println("Shutting down...")
|
||||
try {
|
||||
server.stop(1000, 2000)
|
||||
} catch (e: Exception) {
|
||||
println("Shutdown error: ${e.message}")
|
||||
}
|
||||
})
|
||||
|
||||
server.start(wait = true)
|
||||
}
|
||||
|
||||
@@ -12,4 +12,9 @@ object AppConfig {
|
||||
|
||||
val apiPrefix: String = config.getString("api.prefix")
|
||||
val wsPath: String = config.getString("ws.path")
|
||||
|
||||
val dbUrl = config.getString("ktor.database.url")
|
||||
val dbUser = config.getString("ktor.database.user")
|
||||
val dbPassword = config.getString("ktor.database.password")
|
||||
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package org.pavloveugene.iot.backend.db
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import org.pavloveugene.iot.backend.config.AppConfig
|
||||
import kotlin.getValue
|
||||
|
||||
object Database {
|
||||
|
||||
val dataSource: HikariDataSource by lazy {
|
||||
val config = HikariConfig().apply {
|
||||
jdbcUrl = AppConfig.dbUrl
|
||||
driverClassName = "org.mariadb.jdbc.Driver"
|
||||
username = AppConfig.dbUser
|
||||
password = AppConfig.dbPassword
|
||||
|
||||
maximumPoolSize = 10
|
||||
minimumIdle = 2
|
||||
connectionTimeout = 10000
|
||||
idleTimeout = 30000
|
||||
maxLifetime = 1800000
|
||||
|
||||
isAutoCommit = false
|
||||
}
|
||||
|
||||
HikariDataSource(config)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,123 @@
|
||||
package org.pavloveugene.iot.backend.db
|
||||
|
||||
data class Migration(
|
||||
val version: Int,
|
||||
val sql: String
|
||||
)
|
||||
|
||||
fun loadMigrations(): List<Migration> {
|
||||
val cl = Thread.currentThread().contextClassLoader
|
||||
|
||||
val resources = cl.getResources("db/migration")
|
||||
val result = mutableListOf<Migration>()
|
||||
|
||||
while (resources.hasMoreElements()) {
|
||||
val url = resources.nextElement()
|
||||
|
||||
val uri = url.toURI()
|
||||
|
||||
if (uri.scheme == "file") {
|
||||
// обычный запуск из IDE
|
||||
val dir = java.nio.file.Paths.get(uri)
|
||||
|
||||
java.nio.file.Files.list(dir).forEach { path ->
|
||||
val name = path.fileName.toString()
|
||||
val m = parseMigrationName(name) ?: return@forEach
|
||||
|
||||
val sql = java.nio.file.Files.readString(path)
|
||||
result.add(Migration(m, sql))
|
||||
}
|
||||
} else if (uri.scheme == "jar") {
|
||||
// запуск из jar
|
||||
val fs = java.nio.file.FileSystems.newFileSystem(uri, emptyMap<String, Any>())
|
||||
val dir = fs.getPath("db/migration")
|
||||
|
||||
java.nio.file.Files.list(dir).forEach { path ->
|
||||
val name = path.fileName.toString()
|
||||
val m = parseMigrationName(name) ?: return@forEach
|
||||
|
||||
val sql = java.nio.file.Files.readString(path)
|
||||
result.add(Migration(m, sql))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result.sortedBy { it.version }
|
||||
}
|
||||
|
||||
fun parseMigrationName(name: String): Int? {
|
||||
val regex = Regex("""V(\d+)__.*\.sql""")
|
||||
val match = regex.matchEntire(name) ?: return null
|
||||
return match.groupValues[1].toInt()
|
||||
}
|
||||
|
||||
fun runMigrations() {
|
||||
val ds = Database.dataSource
|
||||
|
||||
ds.connection.use { conn ->
|
||||
|
||||
// создаём таблицу
|
||||
conn.createStatement().use {
|
||||
it.execute("""
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
version INT PRIMARY KEY,
|
||||
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""".trimIndent())
|
||||
}
|
||||
|
||||
// читаем применённые
|
||||
val applied = mutableSetOf<Int>()
|
||||
|
||||
conn.createStatement().use { st ->
|
||||
val rs = st.executeQuery("SELECT version FROM schema_migrations")
|
||||
while (rs.next()) {
|
||||
applied.add(rs.getInt(1))
|
||||
}
|
||||
}
|
||||
|
||||
val migrations = loadMigrations()
|
||||
|
||||
for (m in migrations) {
|
||||
if (m.version in applied) continue
|
||||
|
||||
println("Applying migration V${m.version}")
|
||||
|
||||
try {
|
||||
conn.autoCommit = false
|
||||
|
||||
val statements = splitSql(m.sql)
|
||||
|
||||
for (stmt in statements) {
|
||||
|
||||
println("Applying migration statement:\n $stmt\n========================")
|
||||
|
||||
conn.createStatement().use { st ->
|
||||
st.execute(stmt)
|
||||
}
|
||||
}
|
||||
|
||||
conn.prepareStatement(
|
||||
"INSERT INTO schema_migrations(version) VALUES (?)"
|
||||
).use {
|
||||
it.setInt(1, m.version)
|
||||
it.executeUpdate()
|
||||
}
|
||||
|
||||
conn.commit()
|
||||
} catch (e: Exception) {
|
||||
conn.rollback()
|
||||
throw RuntimeException("Migration V${m.version} failed", e)
|
||||
} finally {
|
||||
conn.autoCommit = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun splitSql(sql: String): List<String> {
|
||||
return sql
|
||||
.split(";")
|
||||
.map { it.trim() }
|
||||
.filter { it.isNotEmpty() }
|
||||
}
|
||||
@@ -1,15 +1,23 @@
|
||||
package org.pavloveugene.iot.backend.dto
|
||||
|
||||
import kotlinx.serialization.SerialName
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
|
||||
|
||||
@Serializable
|
||||
data class BaseMessageDto(
|
||||
val v: Int,
|
||||
val id: String,
|
||||
val type: String,
|
||||
val id: UInt,
|
||||
val t: MessageType,
|
||||
val ts: Long,
|
||||
val deviceId: String,
|
||||
val payload: JsonObject
|
||||
)
|
||||
val d: UInt,
|
||||
val p: JsonElement
|
||||
)
|
||||
|
||||
@Serializable
|
||||
enum class MessageType {
|
||||
@SerialName("t") TELEMETRY,
|
||||
@SerialName("e") EVENT,
|
||||
@SerialName("c") COMMAND
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
import kotlinx.serialization.SerialName
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
|
||||
@kotlinx.serialization.Serializable
|
||||
data class MessageDto(
|
||||
val v: Int,
|
||||
val id: UInt,
|
||||
val t: MessageType,
|
||||
val ts: Long,
|
||||
val d: UInt,
|
||||
val p: JsonElement
|
||||
)
|
||||
|
||||
@Serializable
|
||||
enum class MessageType {
|
||||
@SerialName("t") TELEMETRY,
|
||||
@SerialName("e") EVENT,
|
||||
@SerialName("c") COMMAND
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package org.pavloveugene.iot.backend.dto
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
|
||||
@Serializable
|
||||
data class TelemetryDto(
|
||||
val m: String,
|
||||
val s: String,
|
||||
val u: String,
|
||||
val v: List<List<Double>>
|
||||
)
|
||||
@@ -1,10 +0,0 @@
|
||||
package org.pavloveugene.iot.backend.dto
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
|
||||
@Serializable
|
||||
data class TelemetryPayloadDto(
|
||||
val voltage: Double? = null,
|
||||
val current: Double? = null,
|
||||
val power: Double? = null
|
||||
)
|
||||
@@ -1,12 +1,16 @@
|
||||
package org.pavloveugene.iot.backend.routes
|
||||
|
||||
import MessageDto
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.request.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.server.routing.*
|
||||
import io.ktor.http.*
|
||||
import kotlinx.serialization.json.Json
|
||||
import org.pavloveugene.iot.backend.dto.TelemetryDto
|
||||
import kotlinx.serialization.json.decodeFromJsonElement
|
||||
import org.pavloveugene.iot.backend.config.AppConfig
|
||||
import org.pavloveugene.iot.backend.dto.ProtocolMessage
|
||||
import org.pavloveugene.iot.backend.services.ProtocolService
|
||||
|
||||
fun Route.protocolRoutes() {
|
||||
route(AppConfig.apiPrefix+"/protocol") {
|
||||
@@ -15,10 +19,17 @@ fun Route.protocolRoutes() {
|
||||
}
|
||||
|
||||
post("/message") {
|
||||
val message = call.receive<ProtocolMessage>()
|
||||
// TODO: обработка сообщения
|
||||
println("Received message: $message")
|
||||
call.respond(HttpStatusCode.Accepted, mapOf("received" to message.id))
|
||||
val json = Json {
|
||||
ignoreUnknownKeys = false
|
||||
}
|
||||
|
||||
val protocolService = ProtocolService(json)
|
||||
|
||||
val msg = call.receive<MessageDto>()
|
||||
|
||||
protocolService.handleMessage(msg)
|
||||
|
||||
call.respond(HttpStatusCode.Accepted)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,12 @@
|
||||
package org.pavloveugene.iot.backend.routes
|
||||
|
||||
import MessageDto
|
||||
import io.ktor.server.websocket.*
|
||||
import io.ktor.server.routing.*
|
||||
import io.ktor.websocket.*
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.utils.io.CancellationException
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.decodeFromString
|
||||
@@ -12,37 +14,57 @@ import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.encodeToJsonElement
|
||||
import org.pavloveugene.iot.backend.config.AppConfig
|
||||
import org.pavloveugene.iot.backend.dto.ProtocolMessage
|
||||
import org.pavloveugene.iot.backend.dto.TelemetryDto
|
||||
import org.pavloveugene.iot.backend.services.ProtocolService
|
||||
import java.io.IOException
|
||||
import java.time.Duration
|
||||
|
||||
fun Route.protocolWebSocket() {
|
||||
val json = Json { prettyPrint = true }
|
||||
val json = Json {
|
||||
ignoreUnknownKeys = false
|
||||
}
|
||||
|
||||
val protocolService = ProtocolService(json)
|
||||
|
||||
webSocket(AppConfig.wsPath) {
|
||||
send("Connected to IoT backend WebSocket")
|
||||
println("WS connected")
|
||||
|
||||
for (frame in incoming) {
|
||||
when (frame) {
|
||||
is Frame.Text -> {
|
||||
try {
|
||||
for (frame in incoming) {
|
||||
if (frame is Frame.Text) {
|
||||
val text = frame.readText()
|
||||
try {
|
||||
val message = json.decodeFromString<ProtocolMessage>(text)
|
||||
println("Received WS message: $message")
|
||||
// Эхо обратно
|
||||
val response = ProtocolMessage(
|
||||
v = message.v,
|
||||
id = message.id,
|
||||
type = "response",
|
||||
ts = System.currentTimeMillis(),
|
||||
deviceId = message.deviceId,
|
||||
payload = JsonObject(mapOf("status" to Json.encodeToJsonElement("ok")))
|
||||
)
|
||||
send(json.encodeToString(response))
|
||||
|
||||
val msg = try {
|
||||
json.decodeFromString<MessageDto>(text)
|
||||
} catch (e: Exception) {
|
||||
send("Invalid message format: ${e.message}")
|
||||
println("WS decode error: ${e.message}")
|
||||
safeSend("""{"error":"invalid"}""")
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
protocolService.handleMessage(msg)
|
||||
} catch (e: Exception) {
|
||||
println("WS handler error: ${e.message}")
|
||||
}
|
||||
}
|
||||
else -> {}
|
||||
}
|
||||
} catch (e: CancellationException) {
|
||||
// нормальный shutdown — молчим
|
||||
} catch (e: IOException) {
|
||||
println("WS disconnected: ${e.message}")
|
||||
} catch (e: Exception) {
|
||||
println("WS error: ${e.message}")
|
||||
} finally {
|
||||
println("WS disconnected")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun DefaultWebSocketServerSession.safeSend(text: String) {
|
||||
try {
|
||||
send(text)
|
||||
} catch (_: Exception) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,31 @@
|
||||
package org.pavloveugene.iot.backend.routes
|
||||
|
||||
import io.ktor.http.HttpStatusCode
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.server.routing.*
|
||||
import org.pavloveugene.iot.backend.db.Database
|
||||
|
||||
fun Application.configureRouting() {
|
||||
routing {
|
||||
get("/api/v1/health") {
|
||||
call.respond(mapOf(
|
||||
"status" to "ok"
|
||||
))
|
||||
try {
|
||||
Database.dataSource.connection.use { conn ->
|
||||
conn.createStatement().execute("SELECT 1")
|
||||
}
|
||||
call.respondText("OK")
|
||||
} catch (e: Exception) {
|
||||
call.respond(HttpStatusCode.InternalServerError, "DB ERROR")
|
||||
}
|
||||
}
|
||||
get("/api/v1/ready") {
|
||||
// например:
|
||||
// - миграции применены
|
||||
// - WS сервис готов
|
||||
call.respondText("READY")
|
||||
}
|
||||
get("/live") {
|
||||
call.respondText("ALIVE")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
package org.pavloveugene.iot.backend.services
|
||||
|
||||
import MessageDto
|
||||
import kotlinx.serialization.builtins.ListSerializer
|
||||
import kotlinx.serialization.builtins.serializer
|
||||
import kotlinx.serialization.json.Json
|
||||
import org.pavloveugene.iot.backend.db.Database
|
||||
import org.pavloveugene.iot.backend.dto.TelemetryDto
|
||||
|
||||
class ProtocolService(
|
||||
private val json: Json
|
||||
) {
|
||||
|
||||
fun handleMessage(msg: MessageDto) {
|
||||
when (msg.t) {
|
||||
MessageType.TELEMETRY -> {
|
||||
handleTelemetry(msg)
|
||||
}
|
||||
|
||||
MessageType.EVENT -> {
|
||||
println("=== EVENT ===")
|
||||
println(msg.p)
|
||||
}
|
||||
|
||||
MessageType.COMMAND -> {
|
||||
println("=== COMMAND ===")
|
||||
println(msg.p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun handleTelemetry(msg: MessageDto) {
|
||||
val ds = Database.dataSource
|
||||
|
||||
ds.connection.use { conn ->
|
||||
conn.autoCommit = false
|
||||
|
||||
try {
|
||||
// ensure device
|
||||
conn.prepareStatement("""
|
||||
INSERT INTO devices(id)
|
||||
VALUES (?)
|
||||
ON DUPLICATE KEY UPDATE id = id
|
||||
""").use {
|
||||
it.setLong(1, msg.d.toLong())
|
||||
it.executeUpdate()
|
||||
}
|
||||
|
||||
// check enabled
|
||||
val isEnabled = conn.prepareStatement("""
|
||||
SELECT is_enabled FROM devices WHERE id = ?
|
||||
""").use {
|
||||
it.setLong(1, msg.d.toLong())
|
||||
val rs = it.executeQuery()
|
||||
if (rs.next()) rs.getBoolean(1) else false
|
||||
}
|
||||
|
||||
if (!isEnabled) {
|
||||
println("device ${msg.d} locked, message ignored")
|
||||
conn.commit()
|
||||
return
|
||||
}
|
||||
|
||||
val payload = json.decodeFromJsonElement(TelemetryDto.serializer(), msg.p)
|
||||
|
||||
println("=== TELEMETRY ===")
|
||||
println("device=${msg.d}")
|
||||
println("ts=${msg.ts}")
|
||||
println("metric=${payload.m}")
|
||||
println("values=${payload.v}")
|
||||
|
||||
// insert telemetry
|
||||
conn.prepareStatement("""
|
||||
INSERT INTO telemetry(
|
||||
device_id, ts, metric, source, unit, payload
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
""").use {
|
||||
it.setLong(1, msg.d.toLong())
|
||||
it.setLong(2, msg.ts)
|
||||
it.setString(3, payload.m)
|
||||
it.setString(4, payload.s)
|
||||
it.setString(5, payload.u)
|
||||
|
||||
val payloadJson = json.encodeToString( ListSerializer(ListSerializer(Double.serializer())),payload.v)
|
||||
it.setString(6, payloadJson)
|
||||
|
||||
it.executeUpdate()
|
||||
}
|
||||
|
||||
conn.commit()
|
||||
|
||||
} catch (e: Exception) {
|
||||
conn.rollback()
|
||||
throw e
|
||||
} finally {
|
||||
conn.autoCommit = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,11 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<document>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss} %-5level %logger - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
</document>
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT"/>
|
||||
</root>
|
||||
</configuration>
|
||||
Reference in New Issue
Block a user