package couchdb.changes
import akka.actor.{ActorRef, Actor, ActorLogging, Status}
import akka.io.IO
import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.postfixOps
import scala.language.reflectiveCalls
import scala.util.Try
import spray.can.Http
import spray.can.Http.ConnectionAttemptFailedException
import spray.client.pipelining._
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
import spray.http._
import spray.json._
case class ResultRow(seq: Int, id: String, doc: JsObject)
trait ChangesMarshaller {
implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
import DefaultJsonProtocol._
implicit val changesFormat = jsonFormat3(ResultRow)
def apply(entity: HttpEntity): Deserialized[ResultRow] = Try {
Right(JsonParser(entity.asString(HttpCharsets.`UTF-8`)).asJsObject.
convertTo[ResultRow])
} getOrElse Left(MalformedContent("bad json"))
}
}
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
with ChangesMarshaller with settings {
val io = IO(Http)(context.system)
def config = context.system.settings.config
def receive: Receive = {
case Start =>
val uri = Uri(s"${couchConf.url}/_changes").withQuery(
"feed" -> "continuous"
, "heartbeat" -> couchConf.heartbeat.toMillis.toString
, "include_docs" -> "true"
)
val rq = HttpRequest(HttpMethods.GET, uri = uri)
log.debug("Sending request {}", rq)
sendTo(io).withResponsesReceivedBy(self)(rq)
case x@ChunkedResponseStart(_) =>
log.debug("Received {}", x)
case MessageChunk(data, _) =>
log.debug("Received data chunk {}", data)
if (data.length > 2) // skip heartbeat messages
ChangesMarshaller(data).fold(
_ => log.warning("Failed to deserialize entity {}", data),
processor !
)
case Status.Failure(t) =>
log.error("Received failure {}", t)
t match {
case x:ConnectionAttemptFailedException =>
log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
Start)
case _ =>
}
case x =>
log.debug("Received {}", x)
}
}
// vim: set ts=2 sw=2 et: