396 lines
15 KiB
Python
396 lines
15 KiB
Python
from collections import OrderedDict
|
|
from http import HTTPStatus
|
|
from io import BytesIO
|
|
import hashlib
|
|
import json
|
|
import qrcode
|
|
|
|
from asgiref.sync import async_to_sync
|
|
|
|
from channels.generic.http import AsyncHttpConsumer
|
|
from channels.layers import get_channel_layer
|
|
|
|
from django.db import transaction
|
|
from django.http import HttpResponse
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.views.decorators.http import require_safe, require_http_methods,\
|
|
require_POST
|
|
from django.urls import reverse
|
|
|
|
from .forms import NewGameForm, JoinGameForm, PlayerFormSet
|
|
from .models import Game, Player, Fear
|
|
|
|
|
|
def lookup_access_code(func):
|
|
def with_game(request, access_code, *args, **kwargs):
|
|
game = get_object_or_404(Game, access_code=access_code.lower())
|
|
return func(request, game, *args, **kwargs)
|
|
|
|
return with_game
|
|
|
|
|
|
@require_safe
|
|
def index(request):
|
|
return render(request, 'index.html')
|
|
|
|
|
|
@require_http_methods(["HEAD", "GET", "POST"])
|
|
def enter_code(request):
|
|
if request.method == 'POST':
|
|
form = JoinGameForm(request.POST)
|
|
if form.is_valid():
|
|
game = form.cleaned_data['game']
|
|
return redirect('game', access_code=game.access_code)
|
|
else:
|
|
form = JoinGameForm()
|
|
|
|
return render(request, 'enter_code.html', {'form': form})
|
|
|
|
|
|
@require_http_methods(["HEAD", "GET", "POST"])
|
|
def new_game(request):
|
|
if request.method == 'POST':
|
|
form = NewGameForm(request.POST)
|
|
formset = PlayerFormSet(request.POST)
|
|
if form.is_valid() and formset.is_valid():
|
|
player_forms = list(filter(lambda p: p.cleaned_data.get('name'),
|
|
formset.forms))
|
|
num_players = len(player_forms)
|
|
game = Game()
|
|
game.combined_growth_spirit =\
|
|
form.cleaned_data.get('combined_growth_spirit')
|
|
game.enable_events = form.cleaned_data.get('enable_events')
|
|
game.combined_event = form.cleaned_data.get('combined_event')
|
|
game.england_build = form.cleaned_data.get('england_build')
|
|
game.fear_per_card =\
|
|
form.cleaned_data.get('fear_per_player') * num_players
|
|
game.save()
|
|
|
|
for player_form in player_forms:
|
|
player = Player()
|
|
player.order = int(player_form.prefix[5:]) + 1
|
|
player.spirit = player_form.cleaned_data.get('spirit')
|
|
player.name = player_form.cleaned_data.get('name')
|
|
player.game = game
|
|
player.save()
|
|
|
|
game.advance_phase()
|
|
|
|
return redirect('game', access_code=game.access_code)
|
|
else:
|
|
form = NewGameForm()
|
|
|
|
initial_player_data = [{'spirit': i}
|
|
for (i, _) in Player.enumerate_spirit_names()]
|
|
formset = PlayerFormSet(initial=initial_player_data)
|
|
for player_form in formset:
|
|
player_form.player_id = int(player_form.prefix[5:]) + 1
|
|
|
|
return render(request, 'new_game.html',
|
|
{'form': form, 'formset': formset})
|
|
|
|
|
|
@require_safe
|
|
def qr_code(request, access_code):
|
|
join_url = reverse('game', kwargs={'access_code': access_code.lower()})
|
|
join_url = request.build_absolute_uri(join_url)
|
|
img = qrcode.make(join_url)
|
|
output = BytesIO()
|
|
img.save(output, "PNG")
|
|
return HttpResponse(output.getvalue(), content_type='image/png')
|
|
|
|
|
|
def load_players(game):
|
|
players = OrderedDict()
|
|
for player in game.player_set.order_by('order').all():
|
|
player.total_fear = 0
|
|
player.fear = OrderedDict()
|
|
players[player.order] = player
|
|
|
|
return players
|
|
|
|
|
|
def get_players_with_fear(game, current_phase=None, players=None):
|
|
if players is None:
|
|
players = load_players(game)
|
|
if current_phase is None:
|
|
current_phase = game.get_current_phase()
|
|
|
|
for fear in current_phase.fear_set.order_by('effect').all():
|
|
players[fear.player.order].fear[fear.effect] = {
|
|
'pure_fear': fear.pure_fear,
|
|
'towns': fear.towns_destroyed,
|
|
'cities': fear.cities_destroyed,
|
|
}
|
|
players[fear.player.order].total_fear +=\
|
|
fear.pure_fear + fear.towns_destroyed + 2*fear.cities_destroyed
|
|
|
|
return players
|
|
|
|
|
|
def game_status_object(game, current_phase=None, players_with_fear=None):
|
|
if current_phase is None:
|
|
current_phase = game.get_current_phase()
|
|
if players_with_fear is None:
|
|
players_with_fear = get_players_with_fear(game, current_phase)
|
|
|
|
players = OrderedDict()
|
|
for order, player in players_with_fear.items():
|
|
players[order] = {
|
|
'ready': player.ready,
|
|
'fear': player.fear,
|
|
'total_fear': player.total_fear
|
|
}
|
|
|
|
status_obj = {
|
|
'turn': game.game_turn,
|
|
'phase': game.get_current_phase_name(),
|
|
'phase_id': game.game_phase,
|
|
'available_fear_cards': game.num_available_fear_cards(),
|
|
'fear_to_next_card': game.get_fear_to_next_card(),
|
|
'fear_this_phase': current_phase.fear_this_phase(),
|
|
'total_fear': game.get_current_total_fear(),
|
|
'players': players,
|
|
'all_ready': all(player['ready'] for player in players.values()),
|
|
}
|
|
h = hashlib.sha256()
|
|
h.update(json.dumps(status_obj).encode('utf-8'))
|
|
status_hash = h.hexdigest()
|
|
status_obj['hash'] = status_hash
|
|
|
|
return status_obj
|
|
|
|
|
|
@transaction.atomic
|
|
@lookup_access_code
|
|
@require_POST
|
|
def update_game(request, game):
|
|
return handle_game_request(request, game, update=True)
|
|
|
|
|
|
@transaction.atomic
|
|
@lookup_access_code
|
|
@require_http_methods(["HEAD", "GET", "POST"])
|
|
def game(request, game):
|
|
return handle_game_request(request, game, update=False)
|
|
|
|
|
|
def handle_game_request(request, game, update):
|
|
# TODO Should check if game is over and redirect to summary table view.
|
|
|
|
players = load_players(game)
|
|
for player in players.values():
|
|
player.changed_to_ready = False
|
|
|
|
errors = []
|
|
fear_kind_dict = {
|
|
'fear': 'pure_fear',
|
|
'towns': 'towns_destroyed',
|
|
'cities': 'cities_destroyed',
|
|
}
|
|
current_value = None
|
|
current_phase = game.get_current_phase()
|
|
if request.method == 'POST':
|
|
post_data = request.POST
|
|
|
|
correct_phase =\
|
|
current_phase.game_turn == int(post_data['game_turn'])\
|
|
and current_phase.game_phase == int(post_data['game_phase'])
|
|
advance_phase = 'advance' in post_data
|
|
revert_phase = 'revert' in post_data
|
|
for key, value in post_data.items():
|
|
sections = key.split('-')
|
|
if key in ['csrfmiddlewaretoken',
|
|
'update', 'advance', 'revert',
|
|
'game_turn', 'game_phase', 'phase_name']\
|
|
or sections[-1] == 'orig':
|
|
pass
|
|
elif sections[0] == 'player':
|
|
orig_value = post_data.get(key + '-orig', None)
|
|
player_ord = int(sections[1])
|
|
player = players[player_ord]
|
|
if sections[2] == 'visible':
|
|
player.visible = True
|
|
elif sections[2] == 'ready':
|
|
if correct_phase and value != orig_value:
|
|
if value == 'true':
|
|
player.ready = True
|
|
player.changed_to_ready = True
|
|
else:
|
|
player.ready = False
|
|
player.save()
|
|
elif sections[2] == 'effect':
|
|
effect_num = int(sections[3])
|
|
effect_kind = sections[4]
|
|
if orig_value != value:
|
|
amount = int(value)
|
|
orig = int(orig_value)
|
|
if amount != orig:
|
|
if player.ready and not player.changed_to_ready:
|
|
errors.append(
|
|
f"{player.name} " +
|
|
f"({player.get_spirit_name()}) was " +
|
|
"marked as ready by another user, so " +
|
|
"your attempt to change their effect " +
|
|
f"#{effect_num} {effect_kind} " +
|
|
f"from {orig} to {amount} " +
|
|
"was not processed.")
|
|
elif not correct_phase or advance_phase:
|
|
old_phase = post_data['phase_name']
|
|
errors.append(
|
|
f"{player.name} " +
|
|
f"({player.get_spirit_name()})'s " +
|
|
f"#{effect_num} {effect_kind} " +
|
|
"could not be changed from " +
|
|
f"{orig} to {amount} " +
|
|
f"because {old_phase} has already ended.")
|
|
else:
|
|
fear_row, created = Fear.objects.get_or_create(
|
|
phase=current_phase,
|
|
player=player,
|
|
effect=effect_num)
|
|
prop_name = fear_kind_dict[effect_kind]
|
|
current_value = getattr(fear_row, prop_name)
|
|
if current_value == orig:
|
|
setattr(fear_row, prop_name, amount)
|
|
fear_row.save()
|
|
else:
|
|
errors.append(
|
|
f"{player.name} " +
|
|
f"({player.get_spirit_name()})'s " +
|
|
"effect " +
|
|
f"#{effect_num} {effect_kind} " +
|
|
"was changed by another user to " +
|
|
f"{current_value}, so " +
|
|
"your attempt to change it from " +
|
|
f"{orig} to {amount} " +
|
|
"was not processed.")
|
|
else:
|
|
raise Exception(key + ": " + value)
|
|
else:
|
|
raise Exception(key + ": " + value)
|
|
if advance_phase:
|
|
players_ready = all(player.ready for player in players.values())
|
|
if not players_ready:
|
|
errors.append("Cannot advance to next phase until " +
|
|
"all players are done with the current phase.")
|
|
else:
|
|
current_phase = game.advance_phase()
|
|
for player in players.values():
|
|
player.ready = False
|
|
elif revert_phase:
|
|
current_phase = game.revert_phase()
|
|
for player in players.values():
|
|
player.ready = True
|
|
|
|
if update:
|
|
if errors:
|
|
res = {
|
|
'success': False,
|
|
'errors': '\n'.join(errors)
|
|
}
|
|
if current_value is not None:
|
|
res['value'] = current_value
|
|
else:
|
|
res = {'success': True}
|
|
async_to_sync(get_channel_layer().group_send)(
|
|
"%s_status" % game.access_code,
|
|
{"type": "fear_tracker.invalidate_status"})
|
|
return HttpResponse(json.dumps(res))
|
|
|
|
players = get_players_with_fear(game, current_phase, players)
|
|
status_obj = game_status_object(game, current_phase, players)
|
|
status_string = json.dumps(status_obj)
|
|
|
|
async_to_sync(get_channel_layer().group_send)(
|
|
"%s_status" % game.access_code, {
|
|
"type": "fear_tracker.hashcode_seen",
|
|
"hashcode": status_obj['hash'],
|
|
"status_string": status_string,
|
|
})
|
|
|
|
for player in players.values():
|
|
info = player.fear
|
|
if not info:
|
|
new_effect = 0
|
|
else:
|
|
new_effect = max(info.keys()) + 1
|
|
info[new_effect] = {
|
|
'pure_fear': 0,
|
|
'towns': 0,
|
|
'cities': 0,
|
|
}
|
|
|
|
return render(request, 'game.html', {
|
|
'access_code': game.access_code,
|
|
'status': status_obj,
|
|
'status_json': status_string,
|
|
'players': players,
|
|
'range': range(21),
|
|
'errors': errors,
|
|
})
|
|
|
|
|
|
@lookup_access_code
|
|
@require_safe
|
|
def status(request, game, hashcode=None):
|
|
status_obj = game_status_object(game)
|
|
if hashcode == status_obj['hash']:
|
|
return HttpResponse(status=HTTPStatus.NOT_MODIFIED)
|
|
else:
|
|
status_string = json.dumps(status_obj)
|
|
async_to_sync(get_channel_layer().group_send)(
|
|
"%s_status" % game.access_code, {
|
|
"type": "fear_tracker.hashcode_seen",
|
|
"hashcode": status_obj['hash'],
|
|
"status_string": status_string,
|
|
})
|
|
return HttpResponse(status_string)
|
|
|
|
|
|
class StatusLongPollConsumer(AsyncHttpConsumer):
|
|
async def handle(self, body):
|
|
self.access_code = self.scope["url_route"]["kwargs"]["access_code"]
|
|
self.hashcode = self.scope["url_route"]["kwargs"]["hashcode"]
|
|
|
|
await self.channel_layer.group_add("%s_status" % self.access_code,
|
|
self.channel_name)
|
|
await self.channel_layer.group_send(
|
|
"%s_status" % self.access_code, {
|
|
"type": "fear_tracker.hashcode_seen",
|
|
"hashcode": self.hashcode,
|
|
})
|
|
|
|
async def http_request(self, message):
|
|
"""
|
|
Async entrypoint - concatenates body fragments and hands off control
|
|
to ``self.handle`` when the body has been completely received.
|
|
"""
|
|
if "body" in message:
|
|
self.body.append(message["body"])
|
|
if not message.get("more_body"):
|
|
await self.handle(b"".join(self.body))
|
|
|
|
async def disconnect(self):
|
|
await self.channel_layer.group_discard("%s_status" % self.access_code,
|
|
self.channel_name)
|
|
|
|
async def fear_tracker_hashcode_seen(self, event):
|
|
if self.hashcode != event["hashcode"]:
|
|
if "status_string" in event and event["status_string"]:
|
|
body = event["status_string"].encode('utf-8')
|
|
await self.send_response(200, body)
|
|
await self.disconnect()
|
|
await self.channel_layer.group_send(
|
|
"%s_status" % self.access_code, {
|
|
"type": "fear_tracker.invalidate_status",
|
|
})
|
|
|
|
async def fear_tracker_invalidate_status(self, event):
|
|
no_hash_status = reverse('status',
|
|
kwargs={'access_code': self.access_code})
|
|
await self.send_response(302, b'', headers=[
|
|
(b"Location", no_hash_status.encode('utf-8'))
|
|
])
|
|
await self.http_disconnect(None)
|