src/main/scala/CouchStream.scala
author Tomas Zeman <tzeman@volny.cz>
Tue, 21 Jul 2015 09:24:52 +0200
changeset 0 a279a342bc78
child 4 b25c02bd6b11
permissions -rw-r--r--
Initial import

package couchdb.changes

import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
import spray.http._
import spray.json._
import spray.client.pipelining._
import akka.actor.{ActorRef, Actor, ActorLogging}
import scala.util.Try
import spray.can.Http
import akka.io.IO

case class ResultRow(seq: Int, id: String, doc: JsObject)

trait ChangesMarshaller {

  implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
    import DefaultJsonProtocol._
    implicit val changesFormat = jsonFormat3(ResultRow)

    def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
      Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
    }).getOrElse(Left(MalformedContent("bad json")))

  }
}

class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
  with ChangesMarshaller {

  val io = IO(Http)(context.system)

  def receive: Receive = {
    case uri: Uri =>
      val rq = HttpRequest(HttpMethods.GET, uri = uri)
      log.debug("Sending request {}", rq)
      sendTo(io).withResponsesReceivedBy(self)(rq)
    case x@ChunkedResponseStart(_) =>
      log.debug("Received {}", x)
    case MessageChunk(data, _) =>
      log.debug("Received data chunk {}", data)
      if (data.length > 2) // skip heartbeat messages
        ChangesMarshaller(data).fold(
          _ => log.warning("Failed to deserialize entity {}", data),
          processor !
        )
    case x =>
      log.debug("Received {}", x)
  }
}


// vim: set ts=2 sw=2 et: