|
|
@ -1,4 +1,5 @@ |
|
|
|
require "./ring.cr" |
|
|
|
require "json" |
|
|
|
require "mutex" |
|
|
|
require "openssl" |
|
|
|
|
|
|
@ -12,6 +13,18 @@ module Crystal::Scatter |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
class SliceInfo |
|
|
|
JSON.mapping( |
|
|
|
slices: Array(Array(Slice)), |
|
|
|
last: Int32, |
|
|
|
shards: UInt32 |
|
|
|
) |
|
|
|
|
|
|
|
def initialize(@slices, @last, @shards) |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
class MetaRing |
|
|
|
getter rings : Array(Ring) |
|
|
|
getter ring_graph : RingGraph |
|
|
@ -44,12 +57,31 @@ module Crystal::Scatter |
|
|
|
return tag |
|
|
|
end |
|
|
|
|
|
|
|
private def hash_and_split(data) |
|
|
|
h = hash_impl data |
|
|
|
space=UInt64::MAX/@shards |
|
|
|
targets = Array(UInt64).new |
|
|
|
(1..@shards).each do |
|
|
|
targets << h |
|
|
|
h+=space |
|
|
|
end |
|
|
|
return targets |
|
|
|
end |
|
|
|
|
|
|
|
private def get_slice_from_hash(ring : Ring, h : UInt64) |
|
|
|
value = ring.find do |slice| |
|
|
|
h>=slice.s_begin && slice.s_end>=h |
|
|
|
end |
|
|
|
return value.not_nil! |
|
|
|
end |
|
|
|
|
|
|
|
private def unfold_ring(ring : Ring, targets : Array(UInt64)) |
|
|
|
slices = Array(Slice).new |
|
|
|
targets.each do |h| |
|
|
|
slices << get_slice_from_hash(ring, h) |
|
|
|
end |
|
|
|
return slices |
|
|
|
end |
|
|
|
|
|
|
|
def add(element : Daemon) |
|
|
|
@lock.synchronize do |
|
|
@ -78,44 +110,25 @@ module Crystal::Scatter |
|
|
|
ring? = @rings.last |
|
|
|
end |
|
|
|
ring = ring?.not_nil! |
|
|
|
h = hash_impl data |
|
|
|
space=UInt64::MAX/@shards |
|
|
|
targets = Array(UInt64).new |
|
|
|
(1..@shards).each do |
|
|
|
targets << h |
|
|
|
h+=space |
|
|
|
end |
|
|
|
slices = Array(Slice).new |
|
|
|
targets.each do |h| |
|
|
|
slices << get_slice_from_hash(ring, h) |
|
|
|
end |
|
|
|
targets = hash_and_split(data) |
|
|
|
slices = unfold_ring(ring,targets) |
|
|
|
return slices |
|
|
|
end |
|
|
|
|
|
|
|
def get_slices_for(data : String, n_last o">= 1 : Int32) |
|
|
|
def get_slices_for(data : String, n_last : UInt32) |
|
|
|
n_rings = Array(Ring).new |
|
|
|
sllast = 0 |
|
|
|
@lock.synchronize do |
|
|
|
sllast = @rings.size - 1 |
|
|
|
t = @rings.size-n_last |
|
|
|
n_rings = @rings.skip( t > 0 ? t : 1 ) |
|
|
|
n_rings = @rings.skip( t >= 0 ? t : 1 ) |
|
|
|
end |
|
|
|
h = hash_impl data |
|
|
|
space=UInt64::MAX/@shards |
|
|
|
targets = Array(UInt64).new |
|
|
|
(1..@shards).each do |
|
|
|
targets << h |
|
|
|
h+=space |
|
|
|
end |
|
|
|
|
|
|
|
targets = hash_and_split(data) |
|
|
|
ret = Array(Array(Slice)).new |
|
|
|
n_rings.each do |ring| |
|
|
|
slices = Array(Slice).new |
|
|
|
targets.each do |h| |
|
|
|
slices << get_slice_from_hash(ring, h) |
|
|
|
end |
|
|
|
ret << slices |
|
|
|
ret << unfold_ring(ring,targets) |
|
|
|
end |
|
|
|
return ret |
|
|
|
return SliceInfo.new ret, sllast, @shards |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
end |