src/main/scala/CouchStream.scala
changeset 0 a279a342bc78
child 4 b25c02bd6b11
equal deleted inserted replaced
-1:000000000000 0:a279a342bc78
       
     1 package couchdb.changes
       
     2 
       
     3 import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
       
     4 import spray.http._
       
     5 import spray.json._
       
     6 import spray.client.pipelining._
       
     7 import akka.actor.{ActorRef, Actor, ActorLogging}
       
     8 import scala.util.Try
       
     9 import spray.can.Http
       
    10 import akka.io.IO
       
    11 
       
    12 case class ResultRow(seq: Int, id: String, doc: JsObject)
       
    13 
       
    14 trait ChangesMarshaller {
       
    15 
       
    16   implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
       
    17     import DefaultJsonProtocol._
       
    18     implicit val changesFormat = jsonFormat3(ResultRow)
       
    19 
       
    20     def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
       
    21       Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
       
    22     }).getOrElse(Left(MalformedContent("bad json")))
       
    23 
       
    24   }
       
    25 }
       
    26 
       
    27 class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
       
    28   with ChangesMarshaller {
       
    29 
       
    30   val io = IO(Http)(context.system)
       
    31 
       
    32   def receive: Receive = {
       
    33     case uri: Uri =>
       
    34       val rq = HttpRequest(HttpMethods.GET, uri = uri)
       
    35       log.debug("Sending request {}", rq)
       
    36       sendTo(io).withResponsesReceivedBy(self)(rq)
       
    37     case x@ChunkedResponseStart(_) =>
       
    38       log.debug("Received {}", x)
       
    39     case MessageChunk(data, _) =>
       
    40       log.debug("Received data chunk {}", data)
       
    41       if (data.length > 2) // skip heartbeat messages
       
    42         ChangesMarshaller(data).fold(
       
    43           _ => log.warning("Failed to deserialize entity {}", data),
       
    44           processor !
       
    45         )
       
    46     case x =>
       
    47       log.debug("Received {}", x)
       
    48   }
       
    49 }
       
    50 
       
    51 
       
    52 // vim: set ts=2 sw=2 et: