src/main/scala/CouchStream.scala
author Tomas Zeman <tzeman@volny.cz>
Thu, 05 May 2016 16:26:07 +0200
changeset 5 141e76d946e3
parent 4 b25c02bd6b11
permissions -rw-r--r--
CouchStream: process entity as UTF-8

package couchdb.changes

import akka.actor.{ActorRef, Actor, ActorLogging, Status}
import akka.io.IO

import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.postfixOps
import scala.language.reflectiveCalls
import scala.util.Try

import spray.can.Http
import spray.can.Http.ConnectionAttemptFailedException
import spray.client.pipelining._
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
import spray.http._
import spray.json._

case class ResultRow(seq: Int, id: String, doc: JsObject)

trait ChangesMarshaller {

  implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
    import DefaultJsonProtocol._
    implicit val changesFormat = jsonFormat3(ResultRow)

    def apply(entity: HttpEntity): Deserialized[ResultRow] = Try {
      Right(JsonParser(entity.asString(HttpCharsets.`UTF-8`)).asJsObject.
        convertTo[ResultRow])
    } getOrElse Left(MalformedContent("bad json"))

  }
}

class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
  with ChangesMarshaller with settings {

  val io = IO(Http)(context.system)

  def config = context.system.settings.config

  def receive: Receive = {
    case Start =>
      val uri = Uri(s"${couchConf.url}/_changes").withQuery(
          "feed" -> "continuous"
        , "heartbeat" -> couchConf.heartbeat.toMillis.toString
        , "include_docs" -> "true"
      )
      val rq = HttpRequest(HttpMethods.GET, uri = uri)
      log.debug("Sending request {}", rq)
      sendTo(io).withResponsesReceivedBy(self)(rq)
    case x@ChunkedResponseStart(_) =>
      log.debug("Received {}", x)
    case MessageChunk(data, _) =>
      log.debug("Received data chunk {}", data)
      if (data.length > 2) // skip heartbeat messages
        ChangesMarshaller(data).fold(
          _ => log.warning("Failed to deserialize entity {}", data),
          processor !
        )
    case Status.Failure(t) =>
      log.error("Received failure {}", t)
      t match {
        case x:ConnectionAttemptFailedException =>
          log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
          context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
            Start)
        case _ =>
      }
    case x =>
      log.debug("Received {}", x)
  }
}


// vim: set ts=2 sw=2 et: