HB, OTA etc
This commit is contained in:
BIN
backend/backend.zip
Normal file
BIN
backend/backend.zip
Normal file
Binary file not shown.
@@ -20,6 +20,7 @@ dependencies {
|
||||
implementation("ch.qos.logback:logback-classic:1.4.14")
|
||||
implementation("org.mariadb.jdbc:mariadb-java-client:3.3.3")
|
||||
implementation("com.zaxxer:HikariCP:5.1.0")
|
||||
implementation("io.ktor:ktor-server-compression:2.3.7")
|
||||
testImplementation(kotlin("test"))
|
||||
}
|
||||
|
||||
@@ -43,4 +44,49 @@ tasks.processResources {
|
||||
from("../db/migrations") {
|
||||
into("db/migration")
|
||||
}
|
||||
}
|
||||
|
||||
tasks.register("deploy") {
|
||||
dependsOn("jar")
|
||||
|
||||
doLast {
|
||||
val jarFile = file("build/libs/backend.jar")
|
||||
|
||||
if (!jarFile.exists()) {
|
||||
throw GradleException("Jar not found: ${jarFile.absolutePath}")
|
||||
}
|
||||
|
||||
fun runCommand(vararg cmd: String) {
|
||||
println("Running: ${cmd.joinToString(" ")}")
|
||||
|
||||
val process = ProcessBuilder(*cmd)
|
||||
.inheritIO()
|
||||
.start()
|
||||
|
||||
val exitCode = process.waitFor()
|
||||
if (exitCode != 0) {
|
||||
throw GradleException("Command failed: ${cmd.joinToString(" ")}")
|
||||
}
|
||||
}
|
||||
|
||||
// upload
|
||||
runCommand(
|
||||
"scp",
|
||||
jarFile.absolutePath,
|
||||
"home-iot:/tmp/backend.jar"
|
||||
)
|
||||
|
||||
// deploy
|
||||
runCommand(
|
||||
"ssh",
|
||||
"home-iot",
|
||||
"sudo mv /tmp/backend.jar /opt/iot-backend/app.jar && sudo systemctl restart iot-backend"
|
||||
)
|
||||
|
||||
runCommand("ssh",
|
||||
"home-iot",
|
||||
"systemctl status iot-backend --no-pager")
|
||||
|
||||
println("Deploy completed")
|
||||
}
|
||||
}
|
||||
@@ -15,42 +15,32 @@ import org.pavloveugene.iot.backend.db.Migration
|
||||
import org.pavloveugene.iot.backend.db.runMigrations
|
||||
import java.time.Duration
|
||||
import org.pavloveugene.iot.backend.routes.*
|
||||
import org.pavloveugene.iot.backend.services.executeCleanup
|
||||
import org.pavloveugene.iot.backend.services.runNormalizeLoop
|
||||
import org.pavloveugene.iot.backend.services.startKtorServer
|
||||
import kotlin.system.exitProcess
|
||||
|
||||
fun main() {
|
||||
Database.dataSource
|
||||
fun main(args: Array<String>) {
|
||||
|
||||
Database.init()
|
||||
|
||||
runMigrations()
|
||||
|
||||
val server = embeddedServer(
|
||||
Netty,
|
||||
port = AppConfig.serverPort,
|
||||
host = AppConfig.serverHost,
|
||||
) {
|
||||
install(ContentNegotiation) {
|
||||
json()
|
||||
val mode = args.firstOrNull()
|
||||
|
||||
when (mode) {
|
||||
"--normalize-data", "normalize-data" -> {
|
||||
runNormalizeLoop()
|
||||
}
|
||||
|
||||
install(WebSockets) {
|
||||
pingPeriod = Duration.ofSeconds(15)
|
||||
timeout = Duration.ofSeconds(30)
|
||||
maxFrameSize = Long.MAX_VALUE
|
||||
masking = false
|
||||
"--cleanup", "cleanup" -> {
|
||||
executeCleanup()
|
||||
}
|
||||
|
||||
routing {
|
||||
protocolRoutes()
|
||||
protocolWebSocket()
|
||||
else -> {
|
||||
startKtorServer()
|
||||
}
|
||||
}
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(Thread {
|
||||
println("Shutting down...")
|
||||
try {
|
||||
server.stop(1000, 2000)
|
||||
} catch (e: Exception) {
|
||||
println("Shutdown error: ${e.message}")
|
||||
}
|
||||
})
|
||||
|
||||
server.start(wait = true)
|
||||
exitProcess(0)
|
||||
}
|
||||
|
||||
@@ -3,9 +3,13 @@ package org.pavloveugene.iot.backend.db
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import org.pavloveugene.iot.backend.config.AppConfig
|
||||
import java.sql.Statement
|
||||
import kotlin.getValue
|
||||
|
||||
object Database {
|
||||
fun init() {
|
||||
val ds = dataSource;
|
||||
}
|
||||
|
||||
val dataSource: HikariDataSource by lazy {
|
||||
val config = HikariConfig().apply {
|
||||
@@ -20,9 +24,93 @@ object Database {
|
||||
idleTimeout = 30000
|
||||
maxLifetime = 1800000
|
||||
|
||||
isAutoCommit = false
|
||||
isAutoCommit = true
|
||||
}
|
||||
|
||||
HikariDataSource(config)
|
||||
}
|
||||
|
||||
fun execute(sql: String, params: List<Any?> = emptyList()): Int {
|
||||
dataSource.connection.use { conn ->
|
||||
conn.prepareStatement(sql).use { stmt ->
|
||||
|
||||
params.forEachIndexed { i, p ->
|
||||
stmt.setObject(i + 1, p)
|
||||
}
|
||||
|
||||
return stmt.executeUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun queryOne(sql: String, params: List<Any?> = emptyList()): Map<String, Any?>? {
|
||||
dataSource.connection.use { conn ->
|
||||
conn.prepareStatement(sql).use { stmt ->
|
||||
|
||||
params.forEachIndexed { i, p ->
|
||||
stmt.setObject(i + 1, p)
|
||||
}
|
||||
|
||||
val rs = stmt.executeQuery()
|
||||
if (!rs.next()) return null
|
||||
|
||||
val meta = rs.metaData
|
||||
val map = mutableMapOf<String, Any?>()
|
||||
|
||||
for (i in 1..meta.columnCount) {
|
||||
map[meta.getColumnLabel(i)] = rs.getObject(i)
|
||||
}
|
||||
|
||||
return map
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun query(sql: String, params: List<Any?> = emptyList()): List<Map<String, Any?>> {
|
||||
dataSource.connection.use { conn ->
|
||||
conn.prepareStatement(sql).use { stmt ->
|
||||
|
||||
params.forEachIndexed { i, p ->
|
||||
stmt.setObject(i + 1, p)
|
||||
}
|
||||
|
||||
val rs = stmt.executeQuery()
|
||||
val meta = rs.metaData
|
||||
|
||||
val result = mutableListOf<Map<String, Any?>>()
|
||||
|
||||
while (rs.next()) {
|
||||
val map = mutableMapOf<String, Any?>()
|
||||
|
||||
for (i in 1..meta.columnCount) {
|
||||
map[meta.getColumnLabel(i)] = rs.getObject(i)
|
||||
}
|
||||
|
||||
result.add(map)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun insertAndReturnId(sql: String, params: List<Any?> = emptyList()): Long {
|
||||
dataSource.connection.use { conn ->
|
||||
conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS).use { stmt ->
|
||||
|
||||
params.forEachIndexed { i, p ->
|
||||
stmt.setObject(i + 1, p)
|
||||
}
|
||||
|
||||
stmt.executeUpdate()
|
||||
|
||||
val rs = stmt.generatedKeys
|
||||
if (rs.next()) {
|
||||
return rs.getLong(1)
|
||||
} else {
|
||||
error("No generated key")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package org.pavloveugene.iot.backend.dto
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
|
||||
@Serializable
|
||||
data class EventDto (
|
||||
val type: String
|
||||
)
|
||||
@@ -0,0 +1,134 @@
|
||||
package org.pavloveugene.iot.backend.routes
|
||||
|
||||
import io.ktor.http.HttpStatusCode
|
||||
import io.ktor.http.content.PartData
|
||||
import io.ktor.http.content.forEachPart
|
||||
import io.ktor.http.content.streamProvider
|
||||
import io.ktor.server.application.call
|
||||
import io.ktor.server.request.receiveMultipart
|
||||
import io.ktor.server.response.respond
|
||||
import io.ktor.server.response.respondFile
|
||||
import io.ktor.server.routing.Route
|
||||
import io.ktor.server.routing.get
|
||||
import io.ktor.server.routing.post
|
||||
import org.pavloveugene.iot.backend.db.Database
|
||||
import java.io.File
|
||||
|
||||
fun Route.firmwareRouting() {
|
||||
uploadFirmware()
|
||||
getFirmware()
|
||||
post("/ota_trigger") {
|
||||
// send WS command
|
||||
}
|
||||
}
|
||||
|
||||
fun Route.uploadFirmware() {
|
||||
post("/firmware_upload") {
|
||||
val multipart = call.receiveMultipart()
|
||||
|
||||
var deviceId: Long? = null
|
||||
var version: Int
|
||||
var fileBytes: ByteArray? = null
|
||||
|
||||
multipart.forEachPart { part ->
|
||||
when (part) {
|
||||
is PartData.FormItem -> {
|
||||
when (part.name) {
|
||||
"device_id" -> deviceId = part.value.toLong()
|
||||
}
|
||||
}
|
||||
|
||||
is PartData.FileItem -> {
|
||||
fileBytes = part.streamProvider().readBytes()
|
||||
}
|
||||
|
||||
else -> {}
|
||||
|
||||
}
|
||||
|
||||
part.dispose()
|
||||
}
|
||||
|
||||
if (deviceId == null || fileBytes == null) {
|
||||
call.respond(HttpStatusCode.BadRequest, "Missing fields")
|
||||
return@post
|
||||
}
|
||||
|
||||
// 👉 генерим имя файла
|
||||
val filename = "${java.util.UUID.randomUUID()}.bin"
|
||||
val path = "storage/firmware/$filename"
|
||||
|
||||
// 👉 сохраняем файл
|
||||
java.io.File(path).apply {
|
||||
parentFile.mkdirs()
|
||||
writeBytes(fileBytes!!)
|
||||
}
|
||||
|
||||
// 👉 считаем sha256
|
||||
val sha256 = java.security.MessageDigest
|
||||
.getInstance("SHA-256")
|
||||
.digest(fileBytes!!)
|
||||
.joinToString("") { "%02x".format(it) }
|
||||
|
||||
// 👉 сохраняем в БД
|
||||
while (true) {
|
||||
version = ((Database.queryOne(
|
||||
"""
|
||||
select coalesce(max(version), 0) + 1 as v
|
||||
from firmware
|
||||
where device_id = ?
|
||||
""",
|
||||
listOf(deviceId)
|
||||
)?.get("v") as Number?)?.toInt() ?: 1)
|
||||
|
||||
try {
|
||||
Database.execute(
|
||||
"""
|
||||
insert into firmware (device_id, version, path, sha256, size)
|
||||
values (?, ?, ?, ?, ?)
|
||||
""", listOf(
|
||||
deviceId,
|
||||
version,
|
||||
path,
|
||||
sha256,
|
||||
fileBytes.size
|
||||
)
|
||||
)
|
||||
|
||||
break
|
||||
} catch (e: Exception) {
|
||||
println("Retry insert firmware: ${e.message}")
|
||||
}
|
||||
}
|
||||
|
||||
call.respond(
|
||||
mapOf(
|
||||
"status" to "ok",
|
||||
"filename" to filename,
|
||||
"version" to version,
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fun Route.getFirmware() {
|
||||
get("/firmware/download") {
|
||||
val id = call.parameters["id"]!!.toInt()
|
||||
|
||||
val row = Database.queryOne(
|
||||
"select path from firmware where id=?",
|
||||
listOf(id)
|
||||
)
|
||||
if (row == null) {
|
||||
return@get call.respond(HttpStatusCode.NotFound)
|
||||
}
|
||||
|
||||
val path = row["path"] as String
|
||||
val file = File(path)
|
||||
|
||||
if (!file.exists()) {
|
||||
return@get call.respond(HttpStatusCode.NotFound)
|
||||
}
|
||||
call.respondFile(file)
|
||||
}
|
||||
}
|
||||
@@ -7,29 +7,20 @@ import io.ktor.server.response.*
|
||||
import io.ktor.server.routing.*
|
||||
import io.ktor.http.*
|
||||
import kotlinx.serialization.json.Json
|
||||
import org.pavloveugene.iot.backend.dto.TelemetryDto
|
||||
import kotlinx.serialization.json.decodeFromJsonElement
|
||||
import org.pavloveugene.iot.backend.config.AppConfig
|
||||
import org.pavloveugene.iot.backend.services.ProtocolService
|
||||
import java.time.ZonedDateTime
|
||||
import java.time.ZoneOffset
|
||||
import java.time.format.DateTimeFormatter
|
||||
|
||||
fun Route.protocolRoutes() {
|
||||
route(AppConfig.apiPrefix+"/protocol") {
|
||||
|
||||
route(AppConfig.apiPrefix + "/protocol") {
|
||||
|
||||
get("/health") {
|
||||
call.respond(HttpStatusCode.OK, mapOf("status" to "ok"))
|
||||
}
|
||||
|
||||
post("/message") {
|
||||
val json = Json {
|
||||
ignoreUnknownKeys = false
|
||||
}
|
||||
|
||||
val protocolService = ProtocolService(json)
|
||||
|
||||
val msg = call.receive<MessageDto>()
|
||||
|
||||
protocolService.handleMessage(msg)
|
||||
|
||||
call.respond(HttpStatusCode.Accepted)
|
||||
call.respond("ok")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -4,20 +4,12 @@ import MessageDto
|
||||
import io.ktor.server.websocket.*
|
||||
import io.ktor.server.routing.*
|
||||
import io.ktor.websocket.*
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.utils.io.CancellationException
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.decodeFromString
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.encodeToJsonElement
|
||||
import org.pavloveugene.iot.backend.config.AppConfig
|
||||
import org.pavloveugene.iot.backend.dto.ProtocolMessage
|
||||
import org.pavloveugene.iot.backend.dto.TelemetryDto
|
||||
import org.pavloveugene.iot.backend.services.DeviceConnections
|
||||
import org.pavloveugene.iot.backend.services.ProtocolService
|
||||
import java.io.IOException
|
||||
import java.time.Duration
|
||||
|
||||
fun Route.protocolWebSocket() {
|
||||
val json = Json {
|
||||
@@ -27,8 +19,11 @@ fun Route.protocolWebSocket() {
|
||||
val protocolService = ProtocolService(json)
|
||||
|
||||
webSocket(AppConfig.wsPath) {
|
||||
|
||||
println("WS connected")
|
||||
|
||||
var devId: UInt? = null
|
||||
|
||||
try {
|
||||
for (frame in incoming) {
|
||||
if (frame is Frame.Text) {
|
||||
@@ -43,7 +38,8 @@ fun Route.protocolWebSocket() {
|
||||
}
|
||||
|
||||
try {
|
||||
protocolService.handleMessage(msg)
|
||||
devId = msg.d
|
||||
protocolService.handleMessage(msg, this)
|
||||
} catch (e: Exception) {
|
||||
println("WS handler error: ${e.message}")
|
||||
}
|
||||
@@ -56,7 +52,10 @@ fun Route.protocolWebSocket() {
|
||||
} catch (e: Exception) {
|
||||
println("WS error: ${e.message}")
|
||||
} finally {
|
||||
println("WS disconnected")
|
||||
devId?.let {
|
||||
DeviceConnections.unregister(devId)
|
||||
println("WS disconnected: $it")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
package org.pavloveugene.iot.backend.routes
|
||||
|
||||
import io.ktor.http.HttpStatusCode
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.response.*
|
||||
import io.ktor.server.routing.*
|
||||
import org.pavloveugene.iot.backend.db.Database
|
||||
|
||||
fun Application.configureRouting() {
|
||||
routing {
|
||||
get("/api/v1/health") {
|
||||
try {
|
||||
Database.dataSource.connection.use { conn ->
|
||||
conn.createStatement().execute("SELECT 1")
|
||||
}
|
||||
call.respondText("OK")
|
||||
} catch (e: Exception) {
|
||||
call.respond(HttpStatusCode.InternalServerError, "DB ERROR")
|
||||
}
|
||||
}
|
||||
get("/api/v1/ready") {
|
||||
// например:
|
||||
// - миграции применены
|
||||
// - WS сервис готов
|
||||
call.respondText("READY")
|
||||
}
|
||||
get("/live") {
|
||||
call.respondText("ALIVE")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
package org.pavloveugene.iot.backend.routes
|
||||
|
||||
class WsRoutes {
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package org.pavloveugene.iot.backend.services
|
||||
|
||||
import org.pavloveugene.iot.backend.db.Database
|
||||
import java.sql.SQLException
|
||||
|
||||
fun executeCleanup() {
|
||||
val ds = Database.dataSource
|
||||
val start = System.currentTimeMillis()
|
||||
val cutoff = start / 1000 - 60 * 60 * 24 * 2
|
||||
|
||||
println("Begin cleanup")
|
||||
|
||||
ds.connection.use { conn ->
|
||||
conn.autoCommit = false
|
||||
|
||||
try {
|
||||
var total = 0
|
||||
|
||||
do {
|
||||
val deleted = conn.prepareStatement("""
|
||||
delete t
|
||||
from telemetry t
|
||||
join (
|
||||
select id
|
||||
from telemetry
|
||||
where ts < ? and processed = true
|
||||
limit 1000
|
||||
) p on p.id = t.id
|
||||
""".trimIndent()).use { ps ->
|
||||
ps.setLong(1, cutoff)
|
||||
ps.executeUpdate()
|
||||
}
|
||||
|
||||
total += deleted
|
||||
|
||||
if (deleted > 0) {
|
||||
println("Deleted $deleted rows (total $total)")
|
||||
}
|
||||
|
||||
conn.commit()
|
||||
|
||||
} while (deleted > 0)
|
||||
|
||||
} catch (e: SQLException) {
|
||||
conn.rollback()
|
||||
throw RuntimeException("Error during cleanup", e)
|
||||
} finally {
|
||||
conn.autoCommit = true
|
||||
}
|
||||
}
|
||||
|
||||
println("Cleanup complete in ${System.currentTimeMillis() - start} ms")
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
package org.pavloveugene.iot.backend.services
|
||||
|
||||
import io.ktor.websocket.WebSocketSession
|
||||
|
||||
data class DeviceConnection(
|
||||
val session: WebSocketSession,
|
||||
var lastSeen: Long
|
||||
)
|
||||
@@ -0,0 +1,18 @@
|
||||
package org.pavloveugene.iot.backend.services
|
||||
|
||||
import io.ktor.websocket.WebSocketSession
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
object DeviceConnections {
|
||||
private val map = ConcurrentHashMap<UInt, DeviceConnection>()
|
||||
|
||||
fun register(deviceId: UInt, connection: DeviceConnection) {
|
||||
map[deviceId] = connection
|
||||
}
|
||||
|
||||
fun unregister(deviceId: UInt) {
|
||||
map.remove(deviceId)
|
||||
}
|
||||
|
||||
fun get(deviceId: UInt): DeviceConnection? = map[deviceId]
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package org.pavloveugene.iot.backend.services
|
||||
|
||||
import io.ktor.serialization.kotlinx.json.json
|
||||
import io.ktor.server.application.install
|
||||
import io.ktor.server.engine.embeddedServer
|
||||
import io.ktor.server.netty.Netty
|
||||
import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
|
||||
import io.ktor.server.routing.routing
|
||||
import io.ktor.server.websocket.WebSockets
|
||||
import io.ktor.server.websocket.pingPeriod
|
||||
import io.ktor.server.websocket.timeout
|
||||
import io.netty.handler.codec.compression.StandardCompressionOptions.gzip
|
||||
import org.pavloveugene.iot.backend.config.AppConfig
|
||||
import org.pavloveugene.iot.backend.routes.protocolRoutes
|
||||
import org.pavloveugene.iot.backend.routes.protocolWebSocket
|
||||
import java.time.Duration
|
||||
import io.ktor.server.plugins.compression.*
|
||||
import org.pavloveugene.iot.backend.routes.firmwareRouting
|
||||
|
||||
fun startKtorServer() {
|
||||
val server = embeddedServer(
|
||||
Netty,
|
||||
port = AppConfig.serverPort,
|
||||
host = AppConfig.serverHost,
|
||||
) {
|
||||
install(ContentNegotiation) {
|
||||
json()
|
||||
}
|
||||
|
||||
install(WebSockets) {
|
||||
pingPeriod = Duration.ofSeconds(15)
|
||||
timeout = Duration.ofSeconds(30)
|
||||
maxFrameSize = Long.MAX_VALUE
|
||||
masking = false
|
||||
}
|
||||
|
||||
routing {
|
||||
protocolRoutes()
|
||||
protocolWebSocket()
|
||||
firmwareRouting()
|
||||
}
|
||||
}
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(Thread {
|
||||
println("Shutting down...")
|
||||
try {
|
||||
server.stop(1000, 2000)
|
||||
} catch (e: Exception) {
|
||||
println("Shutdown error: ${e.message}")
|
||||
}
|
||||
})
|
||||
|
||||
server.start(wait = true)
|
||||
}
|
||||
@@ -0,0 +1,187 @@
|
||||
package org.pavloveugene.iot.backend.services
|
||||
|
||||
import org.pavloveugene.iot.backend.db.Database
|
||||
|
||||
fun runNormalizeLoop() {
|
||||
var total = 0
|
||||
val start = System.currentTimeMillis()
|
||||
verifyData()
|
||||
do {
|
||||
val count = normalizeBatch()
|
||||
total += count
|
||||
println("Processed batch: $count")
|
||||
} while (count > 0)
|
||||
|
||||
println("Done. Total processed: $total in ${System.currentTimeMillis() - start} ms")
|
||||
|
||||
}
|
||||
|
||||
fun normalizeBatch(): Int {
|
||||
val ds = Database.dataSource
|
||||
|
||||
var count = 0
|
||||
|
||||
ds.connection.use { conn ->
|
||||
|
||||
conn.autoCommit = false
|
||||
|
||||
try {
|
||||
|
||||
conn.createStatement().use { stmt ->
|
||||
stmt.executeUpdate(
|
||||
"""
|
||||
create temporary table pack as
|
||||
select t.id
|
||||
from telemetry t
|
||||
where t.processed=false and t.defective=false
|
||||
order by t.id
|
||||
limit 1000
|
||||
""".trimIndent()
|
||||
)
|
||||
|
||||
stmt.executeQuery("select count(0) from pack").use { rs ->
|
||||
rs.next()
|
||||
count = rs.getInt(1)
|
||||
}
|
||||
|
||||
if (count > 0) {
|
||||
|
||||
stmt.executeUpdate(
|
||||
"""
|
||||
INSERT INTO telemetry_data (device_id, ts, metric, source, value, unit)
|
||||
SELECT t.device_id
|
||||
, t.ts * 1000 + CAST(JSON_UNQUOTE(JSON_EXTRACT(t.payload, CONCAT('$[', seq.i, '][0]'))) AS double)
|
||||
, t.metric
|
||||
, t.source
|
||||
, CAST(JSON_UNQUOTE(JSON_EXTRACT(t.payload, CONCAT('$[', seq.i, '][1]'))) AS DOUBLE) * coalesce(u.multiplier, 1)
|
||||
, coalesce(u.target_unit, t.unit)
|
||||
FROM pack p
|
||||
join telemetry t on t.id = p.id
|
||||
JOIN ( SELECT 0 AS i
|
||||
UNION ALL SELECT 1
|
||||
UNION ALL SELECT 2
|
||||
UNION ALL SELECT 3
|
||||
UNION ALL SELECT 4
|
||||
UNION ALL SELECT 5
|
||||
UNION ALL SELECT 6
|
||||
UNION ALL SELECT 7
|
||||
UNION ALL SELECT 8
|
||||
UNION ALL SELECT 9
|
||||
UNION ALL SELECT 10
|
||||
UNION ALL SELECT 11
|
||||
UNION ALL SELECT 12
|
||||
UNION ALL SELECT 13
|
||||
UNION ALL SELECT 14
|
||||
UNION ALL SELECT 15
|
||||
UNION ALL SELECT 16
|
||||
UNION ALL SELECT 17
|
||||
UNION ALL SELECT 18
|
||||
UNION ALL SELECT 19
|
||||
UNION ALL SELECT 20
|
||||
UNION ALL SELECT 21
|
||||
UNION ALL SELECT 22
|
||||
UNION ALL SELECT 23
|
||||
UNION ALL SELECT 24
|
||||
UNION ALL SELECT 25
|
||||
UNION ALL SELECT 26
|
||||
UNION ALL SELECT 27
|
||||
UNION ALL SELECT 28
|
||||
UNION ALL SELECT 29
|
||||
UNION ALL SELECT 30
|
||||
UNION ALL SELECT 31
|
||||
) AS seq ON seq.i < JSON_LENGTH(t.payload)
|
||||
left join units u on u.unit = t.unit
|
||||
WHERE JSON_EXTRACT(t.payload, CONCAT('$[', seq.i, ']')) IS NOT NULL
|
||||
""".trimIndent()
|
||||
)
|
||||
|
||||
stmt.executeUpdate(
|
||||
"""
|
||||
update
|
||||
telemetry t
|
||||
join pack p on t.id = p.id
|
||||
set t.processed = true
|
||||
"""
|
||||
)
|
||||
}
|
||||
|
||||
stmt.executeUpdate("drop temporary table pack")
|
||||
|
||||
conn.commit()
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
conn.rollback()
|
||||
throw RuntimeException("Error during normalize", e)
|
||||
} finally {
|
||||
conn.autoCommit = true
|
||||
}
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
fun verifyData(): Boolean {
|
||||
val ds = Database.dataSource
|
||||
var ret = true;
|
||||
|
||||
println("Executing verification")
|
||||
|
||||
ds.connection.use { conn ->
|
||||
|
||||
conn.autoCommit = false
|
||||
|
||||
try {
|
||||
conn.createStatement().use { stmt ->
|
||||
stmt.executeUpdate(
|
||||
"""
|
||||
create temporary table err as
|
||||
select t.id
|
||||
, case
|
||||
when t.unit = '' then 'No unit'
|
||||
else 'Invalid unit'
|
||||
end reason
|
||||
from telemetry t
|
||||
where t.processed=false and t.defective=false
|
||||
and (t.unit = '' or (
|
||||
t.unit not in (select unit from units)
|
||||
and t.unit not in (select target_unit from units)
|
||||
and t.unit not in ('v', 'a', 'm', 'g', 'kg', 'raw')
|
||||
))
|
||||
""".trimIndent()
|
||||
)
|
||||
|
||||
stmt.executeQuery("select count(0) from err").use { rs ->
|
||||
rs.next()
|
||||
val count = rs.getInt(1)
|
||||
ret = count == 0
|
||||
if (ret) {
|
||||
println("All ok!")
|
||||
} else {
|
||||
println("$count errors detected")
|
||||
}
|
||||
}
|
||||
|
||||
stmt.executeUpdate(
|
||||
"""
|
||||
update telemetry t
|
||||
join err e on e.id=t.id
|
||||
set t.defective=true, t.defective_reason=e.reason
|
||||
""".trimIndent()
|
||||
)
|
||||
|
||||
stmt.executeUpdate("drop temporary table err")
|
||||
}
|
||||
|
||||
conn.commit()
|
||||
|
||||
} catch (e: Exception) {
|
||||
conn.rollback()
|
||||
} finally {
|
||||
conn.autoCommit = true
|
||||
}
|
||||
}
|
||||
|
||||
println("Verification complete")
|
||||
|
||||
return ret
|
||||
}
|
||||
@@ -1,17 +1,20 @@
|
||||
package org.pavloveugene.iot.backend.services
|
||||
|
||||
import MessageDto
|
||||
import io.ktor.server.websocket.WebSocketServerSession
|
||||
import io.ktor.websocket.WebSocketSession
|
||||
import kotlinx.serialization.builtins.ListSerializer
|
||||
import kotlinx.serialization.builtins.serializer
|
||||
import kotlinx.serialization.json.Json
|
||||
import org.pavloveugene.iot.backend.db.Database
|
||||
import org.pavloveugene.iot.backend.dto.EventDto
|
||||
import org.pavloveugene.iot.backend.dto.TelemetryDto
|
||||
|
||||
class ProtocolService(
|
||||
private val json: Json
|
||||
) {
|
||||
|
||||
fun handleMessage(msg: MessageDto) {
|
||||
fun handleMessage(msg: MessageDto, session: WebSocketSession) {
|
||||
when (msg.t) {
|
||||
MessageType.TELEMETRY -> {
|
||||
handleTelemetry(msg)
|
||||
@@ -20,6 +23,7 @@ class ProtocolService(
|
||||
MessageType.EVENT -> {
|
||||
println("=== EVENT ===")
|
||||
println(msg.p)
|
||||
handleEvent(msg, session)
|
||||
}
|
||||
|
||||
MessageType.COMMAND -> {
|
||||
@@ -29,6 +33,27 @@ class ProtocolService(
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleEvent(msg: MessageDto, session: WebSocketSession) {
|
||||
val payload = json.decodeFromJsonElement(EventDto.serializer(), msg.p)
|
||||
when (payload.type) {
|
||||
"HB" -> {
|
||||
println("=== HB devId = ${msg.d} ===")
|
||||
|
||||
val connection = DeviceConnections.get(msg.d)
|
||||
if (connection == null) {
|
||||
DeviceConnections.register(
|
||||
msg.d, DeviceConnection(
|
||||
session = session,
|
||||
lastSeen = System.currentTimeMillis()
|
||||
)
|
||||
)
|
||||
} else {
|
||||
connection.lastSeen = System.currentTimeMillis()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun handleTelemetry(msg: MessageDto) {
|
||||
val ds = Database.dataSource
|
||||
|
||||
@@ -37,19 +62,23 @@ class ProtocolService(
|
||||
|
||||
try {
|
||||
// ensure device
|
||||
conn.prepareStatement("""
|
||||
conn.prepareStatement(
|
||||
"""
|
||||
INSERT INTO devices(id)
|
||||
VALUES (?)
|
||||
ON DUPLICATE KEY UPDATE id = id
|
||||
""").use {
|
||||
"""
|
||||
).use {
|
||||
it.setLong(1, msg.d.toLong())
|
||||
it.executeUpdate()
|
||||
}
|
||||
|
||||
// check enabled
|
||||
val isEnabled = conn.prepareStatement("""
|
||||
val isEnabled = conn.prepareStatement(
|
||||
"""
|
||||
SELECT is_enabled FROM devices WHERE id = ?
|
||||
""").use {
|
||||
"""
|
||||
).use {
|
||||
it.setLong(1, msg.d.toLong())
|
||||
val rs = it.executeQuery()
|
||||
if (rs.next()) rs.getBoolean(1) else false
|
||||
@@ -70,18 +99,21 @@ class ProtocolService(
|
||||
println("values=${payload.v}")
|
||||
|
||||
// insert telemetry
|
||||
conn.prepareStatement("""
|
||||
conn.prepareStatement(
|
||||
"""
|
||||
INSERT INTO telemetry(
|
||||
device_id, ts, metric, source, unit, payload
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
""").use {
|
||||
"""
|
||||
).use {
|
||||
it.setLong(1, msg.d.toLong())
|
||||
it.setLong(2, msg.ts)
|
||||
it.setString(3, payload.m)
|
||||
it.setString(4, payload.s)
|
||||
it.setString(5, payload.u)
|
||||
|
||||
val payloadJson = json.encodeToString( ListSerializer(ListSerializer(Double.serializer())),payload.v)
|
||||
val payloadJson =
|
||||
json.encodeToString(ListSerializer(ListSerializer(Double.serializer())), payload.v)
|
||||
it.setString(6, payloadJson)
|
||||
|
||||
it.executeUpdate()
|
||||
|
||||
Reference in New Issue
Block a user