package couchdb.changes
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
import spray.http._
import spray.json._
import spray.client.pipelining._
import akka.actor.{ActorRef, Actor, ActorLogging}
import scala.util.Try
import spray.can.Http
import akka.io.IO
case class ResultRow(seq: Int, id: String, doc: JsObject)
trait ChangesMarshaller {
implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
import DefaultJsonProtocol._
implicit val changesFormat = jsonFormat3(ResultRow)
def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
}).getOrElse(Left(MalformedContent("bad json")))
}
}
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
with ChangesMarshaller {
val io = IO(Http)(context.system)
def receive: Receive = {
case uri: Uri =>
val rq = HttpRequest(HttpMethods.GET, uri = uri)
log.debug("Sending request {}", rq)
sendTo(io).withResponsesReceivedBy(self)(rq)
case x@ChunkedResponseStart(_) =>
log.debug("Received {}", x)
case MessageChunk(data, _) =>
log.debug("Received data chunk {}", data)
if (data.length > 2) // skip heartbeat messages
ChangesMarshaller(data).fold(
_ => log.warning("Failed to deserialize entity {}", data),
processor !
)
case x =>
log.debug("Received {}", x)
}
}
// vim: set ts=2 sw=2 et: